diff --git a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/CalciteSqlOptimizer.java b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/CalciteSqlOptimizer.java index 30b778660f47..1355e53dbaee 100644 --- a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/CalciteSqlOptimizer.java +++ b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/CalciteSqlOptimizer.java @@ -312,6 +312,7 @@ public SqlPlan prepare(OptimizationTask task) { int memberCount = nodeEngine.getClusterService().getSize(MemberSelectors.DATA_MEMBER_SELECTOR); OptimizerContext context = OptimizerContext.create( + nodeEngine.getHazelcastInstance(), task.getSchema(), task.getSearchPaths(), task.getArguments(), @@ -804,8 +805,7 @@ private PhysicalRel optimize( logger.fine("After physical opt:\n" + RelOptUtil.toString(physicalRel)); } - PhysicalRel finalPhysicalRel = physicalRel; - queryPlanListeners.forEach(l -> l.onQueryPlanBuilt(finalPhysicalRel)); + // TODO[sasha]: capture final physical rel for listeners here. return physicalRel; } diff --git a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/OptimizerContext.java b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/OptimizerContext.java index bd6f7c246cfc..94d14872e7e1 100644 --- a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/OptimizerContext.java +++ b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/OptimizerContext.java @@ -17,6 +17,7 @@ package com.hazelcast.jet.sql.impl; import com.google.common.collect.ImmutableList; +import com.hazelcast.core.HazelcastInstance; import com.hazelcast.jet.sql.impl.opt.cost.CostFactory; import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdBoundedness; import com.hazelcast.jet.sql.impl.opt.metadata.HazelcastRelMdPrunability; @@ -107,6 +108,7 @@ private OptimizerContext( * @return Context. */ public static OptimizerContext create( + HazelcastInstance hz, SqlCatalog schema, List> searchPaths, List arguments, @@ -114,7 +116,7 @@ public static OptimizerContext create( SqlSecurityContext securityContext ) { // Resolve tables. - HazelcastSchema rootSchema = HazelcastSchemaUtils.createRootSchema(schema); + HazelcastSchema rootSchema = HazelcastSchemaUtils.createRootSchema(hz, schema); return create(rootSchema, searchPaths, arguments, iMapResolver, securityContext); } diff --git a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/cost/Cost.java b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/cost/Cost.java index 0580f74d9cd3..427273552e79 100644 --- a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/cost/Cost.java +++ b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/cost/Cost.java @@ -30,7 +30,7 @@ * Our implementation still tracks row count, CPU and network, but it doesn't implement unnecessary methods, has proper * comparison semantics, and use CPU and network for cost comparison instead row count. *

- * [1] https://issues.apache.org/jira/browse/CALCITE-3956 + * [1] LINK */ public class Cost implements RelOptCost { @@ -40,6 +40,31 @@ public class Cost implements RelOptCost { public static final Cost HUGE = new Cost(Double.MAX_VALUE / 100, Double.MAX_VALUE / 100, Double.MAX_VALUE / 100); public static final Cost INFINITY = new Cost(Double.MAX_VALUE, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY); + /** + * Multiplier to display hash table building actions: + * - row hash computation; (estimate - 3 ops, the process itself is heavier from CPU ops POV) + * - probe hash table; (estimate - 1 op) + * - walk through hash chain (estimate - 1 op, assuming hash collision may happen) + * - and compare with each element; (estimate - 1 op, assuming hash collision may happen) + * - add the k-v to the table. (estimate - 1 op). + */ + public static final double HASH_JOIN_MULTIPLIER = 7.; + + // During the comparison for each right row we do everything, except adding. + public static final double HASH_JOIN_ROW_CMP_MULTIPLIER = HASH_JOIN_MULTIPLIER - 1; + + // Most of the time you compare one field from both sides in join condition. + public static final double NLJ_JOIN_ROW_CMP_MULTIPLIER = 1.5; + + /** + * Multiplier to display CPU aspect of network broadcast actions (we do not include network cost yet): + * - row serialization; (avg estimate - 4 ops: 1 ops for __key and 3 ops as avg count of value props) + * - write raw data to the socket (estimate - 4 ops) + * - read raw data from the socket (estimate - 2 ops) + * - row deserialization (avg estimate - 4 ops, same as for serialization) + */ + public static final double NETWORK_BROADCAST_FACTOR = 14.; + private final double rows; private final double cpu; private final double network; diff --git a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/cost/CostUtils.java b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/cost/CostUtils.java index 3eb965237a4d..a69054ad7a4a 100644 --- a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/cost/CostUtils.java +++ b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/cost/CostUtils.java @@ -17,9 +17,6 @@ package com.hazelcast.jet.sql.impl.opt.cost; import com.hazelcast.config.IndexType; -import com.hazelcast.jet.sql.impl.validate.types.HazelcastTypeUtils; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataTypeField; /** * Utility methods for cost estimation. @@ -44,6 +41,9 @@ public final class CostUtils { /** Multiplier for the CPU part of the cost. Assumes 1ns per item. */ public static final double CPU_COST_MULTIPLIER = 1.0d; + /** Estimation for average batch size of NLJ' left side. */ + public static final double AVERAGE_NON_EQUI_JOIN_LEFT_BATCH_SIZE = 4.0d; + /** Multiplier for the network part of the cost. Assumes ~10µs per 1Kb that results in ~10ns per byte. */ public static final double NETWORK_COST_MULTIPLIER = CPU_COST_MULTIPLIER * 10; @@ -110,14 +110,4 @@ public static Double adjustFilteredRowCount(Double rowCount, Double selectivity) public static double getProjectCpu(double rowCount, int expressionCount) { return rowCount * expressionCount; } - - public static int getEstimatedRowWidth(RelNode rel) { - int res = 0; - - for (RelDataTypeField field : rel.getRowType().getFieldList()) { - res += HazelcastTypeUtils.toHazelcastType(field.getType()).getTypeFamily().getEstimatedSize(); - } - - return res; - } } diff --git a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/physical/JoinHashPhysicalRel.java b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/physical/JoinHashPhysicalRel.java index 26177efb7279..2185f5b66f69 100644 --- a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/physical/JoinHashPhysicalRel.java +++ b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/physical/JoinHashPhysicalRel.java @@ -16,6 +16,7 @@ package com.hazelcast.jet.sql.impl.opt.physical; +import com.hazelcast.jet.sql.impl.opt.cost.Cost; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; @@ -28,7 +29,6 @@ import org.checkerframework.checker.nullness.qual.Nullable; public class JoinHashPhysicalRel extends JoinPhysicalRel { - private static final double COST_FACTOR = 1.1; JoinHashPhysicalRel( RelOptCluster cluster, @@ -58,9 +58,36 @@ public Join copy( return new JoinHashPhysicalRel(getCluster(), traitSet, left, right, conditionExpr, joinType); } + /** + * Cost calculation of Hash Join relation. It does not rely on children cost. + *

+ * Hash Join algorithm is a more advanced join algorithm, which builds a hash table for the right + * row set, and then compare each row from the left side. Cost estimation is the following:

    + *
  1. Processed row count (PR) is L + R because we traverse both sides once per join. + *
  2. CPU is R * (hash table build cost) + L * (row comparison cost) + (PR * (row projection cost)) + *
  3. Also, for the right side, if it is broadcast, we multiply the row comparison + * cost by a (network broadcast factor)
+ *

+ * A perfect estimation must also include memory (occupied by the hash table) and IO costs. + */ @Override @Nullable public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { - return super.computeSelfCost(planner, mq).multiplyBy(COST_FACTOR); + double leftRowCount = mq.getRowCount(getLeft()); + double rightRowCount = mq.getRowCount(getRight()); + + double projectionCost = getRight().getRowType().getFieldCount(); + double processedRowsCount = leftRowCount + rightRowCount; + double networkBroadcastFactor = 1.; + + if (joinType == JoinRelType.LEFT) { + networkBroadcastFactor = Cost.NETWORK_BROADCAST_FACTOR; + } + + double cpu = leftRowCount * Cost.HASH_JOIN_ROW_CMP_MULTIPLIER + + rightRowCount * Cost.HASH_JOIN_MULTIPLIER * networkBroadcastFactor + + mq.getRowCount(this) * projectionCost; + + return planner.getCostFactory().makeCost(processedRowsCount, cpu, 0.); } } diff --git a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/physical/JoinNestedLoopPhysicalRel.java b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/physical/JoinNestedLoopPhysicalRel.java index 8ac5d73653e3..87ea49dc7078 100644 --- a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/physical/JoinNestedLoopPhysicalRel.java +++ b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/opt/physical/JoinNestedLoopPhysicalRel.java @@ -19,19 +19,25 @@ import com.google.common.collect.ImmutableList; import com.hazelcast.jet.sql.impl.HazelcastPhysicalScan; import com.hazelcast.jet.sql.impl.opt.OptUtils; +import com.hazelcast.jet.sql.impl.opt.cost.Cost; +import com.hazelcast.jet.sql.impl.opt.cost.CostUtils; import com.hazelcast.jet.sql.impl.schema.HazelcastTable; import com.hazelcast.jet.sql.impl.validate.HazelcastSqlOperatorTable; import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.util.ImmutableIntList; +import org.checkerframework.checker.nullness.qual.Nullable; import java.util.ArrayList; import java.util.List; @@ -109,6 +115,53 @@ public V accept(CreateDagVisitor visitor) { return visitor.onNestedLoopJoin(this); } + /** + * Cost calculation of Nested Loop Join relation. + *

+ * Nested Loop Join algorithm is a simple join algorithm, where for each left row, + * we are traversing the whole right row set. Cost estimation is the following:

    + *
  1. L is a count of left side rows. + *
  2. R is a count of right side rows. + *
  3. PD is a produced row count: L * R * (join selectivity). + *
  4. PR is a processed row count: L * k * R, where k is 1 for non-equi-join, + * (join selectivity) ≤ k ≤ 1 for equi-join and 1/R for key lookup. + *
  5. CPU cost is estimated as + * PR * (row comparison cost) + PD * (selectivity) * (row join cost) + ((L - 1) * cost of right side scan) + *
+ *

+ * A perfect estimation must also include memory and IO costs. + */ + @Override + @Nullable + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + boolean isEquiJoin = this.analyzeCondition().isEqui(); + + double leftRowCount = mq.getRowCount(left); + double rightRowCount = mq.getRowCount(right); + double projectionUnitCost = this.getRowType().getFieldCount(); + + if (Double.isInfinite(leftRowCount) || Double.isInfinite(rightRowCount)) { + return planner.getCostFactory().makeInfiniteCost(); + } + + double producedRows = mq.getRowCount(this); + double processedRowCount = Math.max(1.0, leftRowCount * rightRowCount); + + double joinComparisonCost = processedRowCount * Cost.NLJ_JOIN_ROW_CMP_MULTIPLIER; + double joinProjectionCost = producedRows * projectionUnitCost; + double rightSideRepetitions = Math.max(.0, leftRowCount - 1); + + if (!isEquiJoin) { + // TODO: measure that value for this constant in load tests. + rightSideRepetitions /= CostUtils.AVERAGE_NON_EQUI_JOIN_LEFT_BATCH_SIZE; + } + + double rightSideRepetitionsCost = ((Cost) planner.getCost(right, mq)).getCpuInternal() * rightSideRepetitions; + double cpuEstimate = joinComparisonCost + joinProjectionCost + rightSideRepetitionsCost; + + return planner.getCostFactory().makeCost(processedRowCount, cpuEstimate, 0); + } + @Override public Join copy( RelTraitSet traitSet, diff --git a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/schema/HazelcastSchemaUtils.java b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/schema/HazelcastSchemaUtils.java index 72434452c6db..a2ecafdc6c9a 100644 --- a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/schema/HazelcastSchemaUtils.java +++ b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/schema/HazelcastSchemaUtils.java @@ -16,6 +16,7 @@ package com.hazelcast.jet.sql.impl.schema; +import com.hazelcast.core.HazelcastInstance; import com.hazelcast.sql.impl.QueryUtils; import com.hazelcast.sql.impl.schema.SqlCatalog; import com.hazelcast.sql.impl.schema.Table; @@ -56,7 +57,7 @@ public static HazelcastSchema createCatalog(Schema schema) { * * @return Top-level schema. */ - public static HazelcastSchema createRootSchema(SqlCatalog catalog) { + public static HazelcastSchema createRootSchema(HazelcastInstance hz, SqlCatalog catalog) { // Create schemas. Map schemaMap = new HashMap<>(); @@ -68,11 +69,7 @@ public static HazelcastSchema createRootSchema(SqlCatalog catalog) { for (Map.Entry tableEntry : currentSchemaEntry.getValue().entrySet()) { String tableName = tableEntry.getKey(); Table table = tableEntry.getValue(); - - HazelcastTable convertedTable = new HazelcastTable( - table, - createTableStatistic(table) - ); + HazelcastTable convertedTable = new HazelcastTable(table, createTableStatistic(table)); schemaTables.put(tableName, convertedTable); } diff --git a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/schema/HazelcastTable.java b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/schema/HazelcastTable.java index 02b96ea0258f..25abb213796a 100644 --- a/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/schema/HazelcastTable.java +++ b/hazelcast-sql/src/main/java/com/hazelcast/jet/sql/impl/schema/HazelcastTable.java @@ -83,7 +83,6 @@ * properties, thus making further optimization more complex. */ public class HazelcastTable extends AbstractTable { - private final Table target; private final Statistic statistic; private final RexNode filter; @@ -101,7 +100,7 @@ public HazelcastTable(Table target, Statistic statistic) { private HazelcastTable( Table target, Statistic statistic, - @Nonnull List projects, + List projects, @Nullable RelDataType rowType, @Nullable RexNode filter ) { @@ -167,6 +166,7 @@ public Statistic getStatistic() { } } + @SuppressWarnings("DataFlowIssue") public double getTotalRowCount() { return statistic.getRowCount(); } diff --git a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/opt/OptimizerTestSupport.java b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/opt/OptimizerTestSupport.java index 0f9f30ad7faf..dabb2309831a 100644 --- a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/opt/OptimizerTestSupport.java +++ b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/opt/OptimizerTestSupport.java @@ -315,12 +315,7 @@ protected static PlanRow parse(String input) { @Override public String toString() { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < level; i++) { - builder.append(" "); - } - builder.append(node); - return builder.toString(); + return " ".repeat(Math.max(0, level)) + node; } @Override diff --git a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/opt/physical/PhysicalJoinTest.java b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/opt/physical/PhysicalJoinTest.java index 4c735c8d05b5..1a99787fc947 100644 --- a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/opt/physical/PhysicalJoinTest.java +++ b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/opt/physical/PhysicalJoinTest.java @@ -16,29 +16,23 @@ package com.hazelcast.jet.sql.impl.opt.physical; -import com.google.common.collect.ImmutableList; import com.hazelcast.jet.sql.impl.connector.SqlConnectorCache; -import com.hazelcast.jet.sql.impl.connector.test.TestAbstractSqlConnector; -import com.hazelcast.jet.sql.impl.connector.test.TestStreamSqlConnector; import com.hazelcast.jet.sql.impl.opt.OptimizerTestSupport; import com.hazelcast.jet.sql.impl.schema.HazelcastTable; import com.hazelcast.jet.sql.impl.schema.HazelcastTableStatistic; -import com.hazelcast.jet.sql.impl.schema.TableResolverImpl; import com.hazelcast.jet.sql.impl.schema.RelationsStorage; +import com.hazelcast.jet.sql.impl.schema.TableResolverImpl; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.sql.impl.schema.Table; import com.hazelcast.sql.impl.schema.TableResolver; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; import static com.hazelcast.jet.impl.util.Util.getNodeEngine; import static com.hazelcast.sql.impl.extract.QueryPath.KEY; import static com.hazelcast.sql.impl.extract.QueryPath.VALUE; import static com.hazelcast.sql.impl.type.QueryDataType.INT; -import static com.hazelcast.sql.impl.type.QueryDataTypeFamily.INTEGER; -import static com.hazelcast.sql.impl.type.QueryDataTypeFamily.TIMESTAMP; import static java.util.Arrays.asList; public class PhysicalJoinTest extends OptimizerTestSupport { @@ -93,43 +87,6 @@ public void when_rightChildIsNotTableScan_then_useHashJoin() { ); } - @Ignore("Support streaming tables with watermarks in OptimizerTestSupport") - @Test - public void when_bothInputsAreStreamScan_then_useS2SJoin() { - String leftStream = "l"; - TestStreamSqlConnector.create( - instance().getSql(), - leftStream, - asList("a", "b"), - asList(INTEGER, TIMESTAMP), - row(1, timestamp(1L)) - ); - - String rightStream = "r"; - TestStreamSqlConnector.create( - instance().getSql(), - rightStream, - asList("x", "y"), - asList(INTEGER, TIMESTAMP), - row(1, timestamp(1L)) - ); - - assertInstanceOf(TestAbstractSqlConnector.TestTable.class, resolver.getTables().get(0)); - assertInstanceOf(TestAbstractSqlConnector.TestTable.class, resolver.getTables().get(1)); - HazelcastTable tableLeft = streamingTable(resolver.getTables().get(0)); - HazelcastTable tableRight = streamingTable(resolver.getTables().get(1)); - - String query = "SELECT * FROM l JOIN r ON l.b = r.y"; - assertPlan( - optimizePhysical(query, ImmutableList.of(), tableLeft, tableRight).getPhysical(), - plan( - planRow(0, StreamToStreamJoinPhysicalRel.class), - planRow(1, FullScanPhysicalRel.class), - planRow(1, FullScanPhysicalRel.class) - ) - ); - } - private static HazelcastTable streamingTable(Table table) { return new HazelcastTable(table, new HazelcastTableStatistic(1)); } diff --git a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/ParserNameResolutionTest.java b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/ParserNameResolutionTest.java index b3a86a084f8a..a54543d9dea9 100644 --- a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/ParserNameResolutionTest.java +++ b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/ParserNameResolutionTest.java @@ -225,6 +225,7 @@ private static OptimizerContext createContext() { List> searchPaths = QueryUtils.prepareSearchPaths(emptyList(), tableResolvers); return OptimizerContext.create( + instance(), new SqlCatalog(tableResolvers), searchPaths, emptyList(), diff --git a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/ParserOperationsTest.java b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/ParserOperationsTest.java index 4c9f4f237ea5..03c591bcfb6d 100644 --- a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/ParserOperationsTest.java +++ b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/ParserOperationsTest.java @@ -207,6 +207,7 @@ private static OptimizerContext createContext() { List> searchPaths = QueryUtils.prepareSearchPaths(emptyList(), tableResolvers); return OptimizerContext.create( + instance(), new SqlCatalog(tableResolvers), searchPaths, emptyList(), diff --git a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/UnparseTest.java b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/UnparseTest.java index 0284f75bac02..5b94fe12c5c1 100644 --- a/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/UnparseTest.java +++ b/hazelcast-sql/src/test/java/com/hazelcast/jet/sql/impl/parse/UnparseTest.java @@ -92,6 +92,7 @@ private void checkQuery(String query) { private static OptimizerContext createContext() { return OptimizerContext.create( + instance(), new SqlCatalog(emptyList()), emptyList(), emptyList(),