Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust joins cost estimation [HZ-2658, HZ-2494] #25249

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
57b9c02
Adjust joins cost estimation
Fly-Style Aug 17, 2023
7d4410f
Fix test
Fly-Style Aug 18, 2023
187c05d
Introduce QueryPlanListener for further optimizer tests
Fly-Style Aug 18, 2023
1775447
Adjust cost model
Fly-Style Aug 18, 2023
48abe08
Brackets
Fly-Style Aug 18, 2023
98ad736
Revert test change back, and actually it is correct
Fly-Style Aug 18, 2023
d1a1b16
Address review comments
Fly-Style Sep 4, 2023
c4bd3ff
Divide processed and produced rows
Fly-Style Sep 5, 2023
83dc9b5
Apply suggestions from code review
Fly-Style Sep 7, 2023
dc4e5dc
Merge branch 'master' into fix/5.4/hash-join-cost-adj
Fly-Style Sep 7, 2023
d003fc7
Adjust costs after the discussion
Fly-Style Sep 8, 2023
f2b9e19
Merge branch 'master' into fix/5.4/hash-join-cost-adj
Fly-Style Sep 8, 2023
bd9efeb
Remove selectivity from cpu calculation
Fly-Style Sep 18, 2023
50fee00
Fix algorithm even better
Fly-Style Sep 18, 2023
6044a69
Update comment
Fly-Style Sep 18, 2023
516ab7b
Fix HazelcastTable ctor workflow
Fly-Style Sep 18, 2023
05ec305
Update hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/phy…
Fly-Style Sep 19, 2023
68def60
Multiply
Fly-Style Sep 28, 2023
b06e92b
Cost readjustment
Fly-Style Sep 28, 2023
4933580
Merge branch 'master' into fix/5.4/hash-join-cost-adj
Fly-Style Oct 4, 2023
fbd5492
Docs and constant adjustments
Fly-Style Oct 4, 2023
ae0bd63
Typos
Fly-Style Oct 4, 2023
f65293f
Revert back rows estimation of IMap; add broadcast factor for hash LE…
Fly-Style Oct 19, 2023
491e5fd
Clarify doc for hash join
Fly-Style Jan 12, 2024
a716a94
Checkstyle
Fly-Style Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ public SqlPlan prepare(OptimizationTask task) {
int memberCount = nodeEngine.getClusterService().getSize(MemberSelectors.DATA_MEMBER_SELECTOR);

OptimizerContext context = OptimizerContext.create(
nodeEngine.getHazelcastInstance(),
task.getSchema(),
task.getSearchPaths(),
task.getArguments(),
Expand Down Expand Up @@ -804,8 +805,7 @@ private PhysicalRel optimize(
logger.fine("After physical opt:\n" + RelOptUtil.toString(physicalRel));
}

PhysicalRel finalPhysicalRel = physicalRel;
queryPlanListeners.forEach(l -> l.onQueryPlanBuilt(finalPhysicalRel));
// TODO[sasha]: capture final physical rel for listeners here.
Fly-Style marked this conversation as resolved.
Show resolved Hide resolved
return physicalRel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hazelcast.jet.sql.impl;

import com.google.common.collect.ImmutableList;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.sql.impl.opt.cost.CostFactory;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdBoundedness;
import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdPrunability;
Expand Down Expand Up @@ -107,14 +108,15 @@ private OptimizerContext(
* @return Context.
*/
public static OptimizerContext create(
HazelcastInstance hz,
SqlCatalog schema,
List<List<String>> searchPaths,
List<Object> arguments,
IMapResolver iMapResolver,
SqlSecurityContext securityContext
) {
// Resolve tables.
HazelcastSchema rootSchema = HazelcastSchemaUtils.createRootSchema(schema);
HazelcastSchema rootSchema = HazelcastSchemaUtils.createRootSchema(hz, schema);

return create(rootSchema, searchPaths, arguments, iMapResolver, securityContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Our implementation still tracks row count, CPU and network, but it doesn't implement unnecessary methods, has proper
* comparison semantics, and use CPU and network for cost comparison instead row count.
* <p>
* [1] https://issues.apache.org/jira/browse/CALCITE-3956
* [1] <a href="https://issues.apache.org/jira/browse/CALCITE-3956">LINK</a>
*/
public class Cost implements RelOptCost {

Expand All @@ -40,6 +40,31 @@ public class Cost implements RelOptCost {
public static final Cost HUGE = new Cost(Double.MAX_VALUE / 100, Double.MAX_VALUE / 100, Double.MAX_VALUE / 100);
public static final Cost INFINITY = new Cost(Double.MAX_VALUE, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY);

/**
* Multiplier to display hash table building actions:
* - row hash computation; (estimate - 3 ops, the process itself is heavier from CPU ops POV)
* - probe hash table; (estimate - 1 op)
* - walk through hash chain (estimate - 1 op, assuming hash collision may happen)
* - and compare with each element; (estimate - 1 op, assuming hash collision may happen)
* - add the k-v to the table. (estimate - 1 op).
*/
public static final double HASH_JOIN_MULTIPLIER = 7.;

// During the comparison for each right row we do everything, except adding.
public static final double HASH_JOIN_ROW_CMP_MULTIPLIER = HASH_JOIN_MULTIPLIER - 1;

// Most of the time you compare one field from both sides in join condition.
public static final double NLJ_JOIN_ROW_CMP_MULTIPLIER = 1.5;

/**
* Multiplier to display CPU aspect of network broadcast actions (we do not include network cost yet):
* - row serialization; (avg estimate - 4 ops: 1 ops for __key and 3 ops as avg count of value props)
* - write raw data to the socket (estimate - 4 ops)
* - read raw data from the socket (estimate - 2 ops)
* - row deserialization (avg estimate - 4 ops, same as for serialization)
*/
public static final double NETWORK_BROADCAST_FACTOR = 14.;

private final double rows;
private final double cpu;
private final double network;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
package com.hazelcast.jet.sql.impl.opt.cost;

import com.hazelcast.config.IndexType;
import com.hazelcast.jet.sql.impl.validate.types.HazelcastTypeUtils;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataTypeField;

/**
* Utility methods for cost estimation.
Expand All @@ -44,6 +41,9 @@ public final class CostUtils {
/** Multiplier for the CPU part of the cost. Assumes 1ns per item. */
public static final double CPU_COST_MULTIPLIER = 1.0d;

/** Estimation for average batch size of NLJ' left side. */
public static final double AVERAGE_NON_EQUI_JOIN_LEFT_BATCH_SIZE = 4.0d;

/** Multiplier for the network part of the cost. Assumes ~10µs per 1Kb that results in ~10ns per byte. */
public static final double NETWORK_COST_MULTIPLIER = CPU_COST_MULTIPLIER * 10;

Expand Down Expand Up @@ -110,14 +110,4 @@ public static Double adjustFilteredRowCount(Double rowCount, Double selectivity)
public static double getProjectCpu(double rowCount, int expressionCount) {
return rowCount * expressionCount;
}

public static int getEstimatedRowWidth(RelNode rel) {
int res = 0;

for (RelDataTypeField field : rel.getRowType().getFieldList()) {
res += HazelcastTypeUtils.toHazelcastType(field.getType()).getTypeFamily().getEstimatedSize();
}

return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.jet.sql.impl.opt.physical;

import com.hazelcast.jet.sql.impl.opt.cost.Cost;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
Expand All @@ -28,7 +29,6 @@
import org.checkerframework.checker.nullness.qual.Nullable;

public class JoinHashPhysicalRel extends JoinPhysicalRel {
private static final double COST_FACTOR = 1.1;

JoinHashPhysicalRel(
RelOptCluster cluster,
Expand Down Expand Up @@ -58,9 +58,36 @@ public Join copy(
return new JoinHashPhysicalRel(getCluster(), traitSet, left, right, conditionExpr, joinType);
}

/**
* Cost calculation of Hash Join relation. It does not rely on children cost.
* <p>
* Hash Join algorithm is a more advanced join algorithm, which builds a hash table for the right
* row set, and then compare each row from the left side. Cost estimation is the following: <ol>
* <li> Processed row count (PR) is L + R because we traverse both sides once per join.
* <li> CPU is R * (hash table build cost) + L * (row comparison cost) + (PR * (row projection cost))
* <li> Also, for the right side, if it is broadcast, we multiply the row comparison
* cost by a (network broadcast factor)</ol>
* <p>
* A perfect estimation must also include memory (occupied by the hash table) and IO costs.
*/
@Override
@Nullable
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
return super.computeSelfCost(planner, mq).multiplyBy(COST_FACTOR);
double leftRowCount = mq.getRowCount(getLeft());
double rightRowCount = mq.getRowCount(getRight());

double projectionCost = getRight().getRowType().getFieldCount();
double processedRowsCount = leftRowCount + rightRowCount;
double networkBroadcastFactor = 1.;

if (joinType == JoinRelType.LEFT) {
networkBroadcastFactor = Cost.NETWORK_BROADCAST_FACTOR;
}

double cpu = leftRowCount * Cost.HASH_JOIN_ROW_CMP_MULTIPLIER
+ rightRowCount * Cost.HASH_JOIN_MULTIPLIER * networkBroadcastFactor
+ mq.getRowCount(this) * projectionCost;

return planner.getCostFactory().makeCost(processedRowsCount, cpu, 0.);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@
import com.google.common.collect.ImmutableList;
import com.hazelcast.jet.sql.impl.HazelcastPhysicalScan;
import com.hazelcast.jet.sql.impl.opt.OptUtils;
import com.hazelcast.jet.sql.impl.opt.cost.Cost;
import com.hazelcast.jet.sql.impl.opt.cost.CostUtils;
import com.hazelcast.jet.sql.impl.schema.HazelcastTable;
import com.hazelcast.jet.sql.impl.validate.HazelcastSqlOperatorTable;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.util.ImmutableIntList;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -109,6 +115,53 @@ public <V> V accept(CreateDagVisitor<V> visitor) {
return visitor.onNestedLoopJoin(this);
}

/**
* Cost calculation of Nested Loop Join relation.
* <p>
* Nested Loop Join algorithm is a simple join algorithm, where for each left row,
* we are traversing the whole right row set. Cost estimation is the following: <ol>
* <li> L is a count of left side rows.
* <li> R is a count of right side rows.
* <li> PD is a produced row count: L * R * (join selectivity).
* <li> PR is a processed row count: L * k * R, where k is 1 for non-equi-join,
* (join selectivity) ≤ k ≤ 1 for equi-join and 1/R for key lookup.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is "(join selectivity) ≤ k ≤ 1 for equi-join" and "1/R for key lookup" implemented? The formula in the code is double processedRowCount = Math.max(1.0, leftRowCount * rightRowCount);

* <li> CPU cost is estimated as
* PR * (row comparison cost) + PD * (selectivity) * (row join cost) + ((L - 1) * cost of right side scan)
* </ol>
* <p>
* A perfect estimation must also include memory and IO costs.
*/
@Override
@Nullable
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
boolean isEquiJoin = this.analyzeCondition().isEqui();

double leftRowCount = mq.getRowCount(left);
double rightRowCount = mq.getRowCount(right);
double projectionUnitCost = this.getRowType().getFieldCount();

if (Double.isInfinite(leftRowCount) || Double.isInfinite(rightRowCount)) {
Fly-Style marked this conversation as resolved.
Show resolved Hide resolved
return planner.getCostFactory().makeInfiniteCost();
}

double producedRows = mq.getRowCount(this);
double processedRowCount = Math.max(1.0, leftRowCount * rightRowCount);

double joinComparisonCost = processedRowCount * Cost.NLJ_JOIN_ROW_CMP_MULTIPLIER;
double joinProjectionCost = producedRows * projectionUnitCost;
double rightSideRepetitions = Math.max(.0, leftRowCount - 1);

if (!isEquiJoin) {
// TODO: measure that value for this constant in load tests.
rightSideRepetitions /= CostUtils.AVERAGE_NON_EQUI_JOIN_LEFT_BATCH_SIZE;
}

double rightSideRepetitionsCost = ((Cost) planner.getCost(right, mq)).getCpuInternal() * rightSideRepetitions;
double cpuEstimate = joinComparisonCost + joinProjectionCost + rightSideRepetitionsCost;

return planner.getCostFactory().makeCost(processedRowCount, cpuEstimate, 0);
}

@Override
public Join copy(
RelTraitSet traitSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.hazelcast.jet.sql.impl.schema;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.sql.impl.QueryUtils;
import com.hazelcast.sql.impl.schema.SqlCatalog;
import com.hazelcast.sql.impl.schema.Table;
Expand Down Expand Up @@ -56,7 +57,7 @@ public static HazelcastSchema createCatalog(Schema schema) {
*
* @return Top-level schema.
*/
public static HazelcastSchema createRootSchema(SqlCatalog catalog) {
public static HazelcastSchema createRootSchema(HazelcastInstance hz, SqlCatalog catalog) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need hz anymore

// Create schemas.
Map<String, Schema> schemaMap = new HashMap<>();

Expand All @@ -68,11 +69,7 @@ public static HazelcastSchema createRootSchema(SqlCatalog catalog) {
for (Map.Entry<String, Table> tableEntry : currentSchemaEntry.getValue().entrySet()) {
String tableName = tableEntry.getKey();
Table table = tableEntry.getValue();

HazelcastTable convertedTable = new HazelcastTable(
table,
createTableStatistic(table)
);
HazelcastTable convertedTable = new HazelcastTable(table, createTableStatistic(table));

schemaTables.put(tableName, convertedTable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
* properties, thus making further optimization more complex.
*/
public class HazelcastTable extends AbstractTable {

private final Table target;
private final Statistic statistic;
private final RexNode filter;
Expand All @@ -101,7 +100,7 @@ public HazelcastTable(Table target, Statistic statistic) {
private HazelcastTable(
Table target,
Statistic statistic,
@Nonnull List<RexNode> projects,
List<RexNode> projects,
@Nullable RelDataType rowType,
@Nullable RexNode filter
) {
Expand Down Expand Up @@ -167,6 +166,7 @@ public Statistic getStatistic() {
}
}

@SuppressWarnings("DataFlowIssue")
public double getTotalRowCount() {
return statistic.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,7 @@ protected static PlanRow parse(String input) {

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < level; i++) {
builder.append(" ");
}
builder.append(node);
return builder.toString();
return " ".repeat(Math.max(0, level)) + node;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,23 @@

package com.hazelcast.jet.sql.impl.opt.physical;

import com.google.common.collect.ImmutableList;
import com.hazelcast.jet.sql.impl.connector.SqlConnectorCache;
import com.hazelcast.jet.sql.impl.connector.test.TestAbstractSqlConnector;
import com.hazelcast.jet.sql.impl.connector.test.TestStreamSqlConnector;
import com.hazelcast.jet.sql.impl.opt.OptimizerTestSupport;
import com.hazelcast.jet.sql.impl.schema.HazelcastTable;
import com.hazelcast.jet.sql.impl.schema.HazelcastTableStatistic;
import com.hazelcast.jet.sql.impl.schema.TableResolverImpl;
import com.hazelcast.jet.sql.impl.schema.RelationsStorage;
import com.hazelcast.jet.sql.impl.schema.TableResolverImpl;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.sql.impl.schema.Table;
import com.hazelcast.sql.impl.schema.TableResolver;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import static com.hazelcast.jet.impl.util.Util.getNodeEngine;
import static com.hazelcast.sql.impl.extract.QueryPath.KEY;
import static com.hazelcast.sql.impl.extract.QueryPath.VALUE;
import static com.hazelcast.sql.impl.type.QueryDataType.INT;
import static com.hazelcast.sql.impl.type.QueryDataTypeFamily.INTEGER;
import static com.hazelcast.sql.impl.type.QueryDataTypeFamily.TIMESTAMP;
import static java.util.Arrays.asList;

public class PhysicalJoinTest extends OptimizerTestSupport {
Expand Down Expand Up @@ -93,43 +87,6 @@ public void when_rightChildIsNotTableScan_then_useHashJoin() {
);
}

@Ignore("Support streaming tables with watermarks in OptimizerTestSupport")
@Test
public void when_bothInputsAreStreamScan_then_useS2SJoin() {
String leftStream = "l";
TestStreamSqlConnector.create(
instance().getSql(),
leftStream,
asList("a", "b"),
asList(INTEGER, TIMESTAMP),
row(1, timestamp(1L))
);

String rightStream = "r";
TestStreamSqlConnector.create(
instance().getSql(),
rightStream,
asList("x", "y"),
asList(INTEGER, TIMESTAMP),
row(1, timestamp(1L))
);

assertInstanceOf(TestAbstractSqlConnector.TestTable.class, resolver.getTables().get(0));
assertInstanceOf(TestAbstractSqlConnector.TestTable.class, resolver.getTables().get(1));
HazelcastTable tableLeft = streamingTable(resolver.getTables().get(0));
HazelcastTable tableRight = streamingTable(resolver.getTables().get(1));

String query = "SELECT * FROM l JOIN r ON l.b = r.y";
assertPlan(
optimizePhysical(query, ImmutableList.of(), tableLeft, tableRight).getPhysical(),
plan(
planRow(0, StreamToStreamJoinPhysicalRel.class),
planRow(1, FullScanPhysicalRel.class),
planRow(1, FullScanPhysicalRel.class)
)
);
}

private static HazelcastTable streamingTable(Table table) {
return new HazelcastTable(table, new HazelcastTableStatistic(1));
}
Expand Down
Loading
Loading