Skip to content
Permalink
Browse files
IGNITE-16013 SQL Calcite: Optimized sort-with-limit execution - Fixes #…
…9987.

Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
  • Loading branch information
Vladsz83 authored and alex-plekhanov committed May 17, 2022
1 parent b2c6f41 commit 9f138a4744269388d8ae238d6dcdae3c97f9a1ed
Showing 11 changed files with 296 additions and 57 deletions.
@@ -463,7 +463,11 @@ public LogicalRelImplementor(
@Override public Node<Row> visit(IgniteSort rel) {
RelCollation collation = rel.getCollation();

SortNode<Row> node = new SortNode<>(ctx, rel.getRowType(), expressionFactory.comparator(collation));
Supplier<Integer> offset = (rel.offset == null) ? null : expressionFactory.execute(rel.offset);
Supplier<Integer> fetch = (rel.fetch == null) ? null : expressionFactory.execute(rel.fetch);

SortNode<Row> node = new SortNode<>(ctx, rel.getRowType(), expressionFactory.comparator(collation), offset,
fetch);

Node<Row> input = visit(rel.getInput());

@@ -16,12 +16,16 @@
*/
package org.apache.ignite.internal.processors.query.calcite.exec.rel;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;

import java.util.function.Supplier;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;

/**
* Sort node.
@@ -39,21 +43,54 @@
/** Rows buffer. */
private final PriorityQueue<Row> rows;

/** SQL select limit. Negative if disabled. */
private final int limit;

/** Reverse-ordered rows in case of limited sort. */
private List<Row> reversed;

/**
* @param ctx Execution context.
* @param comp Rows comparator.
* @param offset Offset.
* @param fetch Limit.
*/
public SortNode(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row> comp) {
public SortNode(
ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp,
@Nullable Supplier<Integer> offset,
@Nullable Supplier<Integer> fetch
) {
super(ctx, rowType);

rows = comp == null ? new PriorityQueue<>() : new PriorityQueue<>(comp);
assert fetch == null || fetch.get() >= 0;
assert offset == null || offset.get() >= 0;

limit = fetch == null ? -1 : fetch.get() + (offset == null ? 0 : offset.get());

if (limit < 0)
rows = new PriorityQueue<>(comp);
else {
rows = new GridBoundedPriorityQueue<>(limit, comp == null ? (Comparator<Row>)Comparator.reverseOrder()
: comp.reversed());
}
}

/**
* @param ctx Execution context.
* @param comp Rows comparator.
*/
public SortNode(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row> comp) {
this(ctx, rowType, comp, null, null);
}

/** {@inheritDoc} */
@Override protected void rewindInternal() {
requested = 0;
waiting = 0;
rows.clear();
if (reversed != null)
reversed.clear();
}

/** {@inheritDoc} */
@@ -84,6 +121,7 @@ else if (!inLoop)
@Override public void push(Row row) throws Exception {
assert downstream() != null;
assert waiting > 0;
assert reversed == null || reversed.isEmpty();

checkState();

@@ -118,12 +156,31 @@ private void flush() throws Exception {

inLoop = true;
try {
while (requested > 0 && !rows.isEmpty()) {
// Prepare final order (reversed).
if (limit > 0 && !rows.isEmpty()) {
if (reversed == null)
reversed = new ArrayList<>(rows.size());

while (!rows.isEmpty()) {
reversed.add(rows.poll());

if (++processed >= IN_BUFFER_SIZE) {
// Allow the others to do their job.
context().execute(this::flush, this::onError);

return;
}
}

processed = 0;
}

while (requested > 0 && (reversed == null ? !rows.isEmpty() : !reversed.isEmpty())) {
checkState();

requested--;

downstream().push(rows.poll());
downstream().push(reversed == null ? rows.poll() : reversed.remove(reversed.size() - 1));

if (++processed >= IN_BUFFER_SIZE && requested > 0) {
// allow others to do their job
@@ -133,7 +190,7 @@ private void flush() throws Exception {
}
}

if (rows.isEmpty()) {
if (reversed == null ? rows.isEmpty() : reversed.isEmpty()) {
if (requested > 0)
downstream().end();

@@ -22,6 +22,7 @@
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Minus;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdRowCount;
import org.apache.calcite.rel.metadata.RelMdUtil;
@@ -53,6 +54,11 @@ public class IgniteMdRowCount extends RelMdRowCount {
return rel.estimateRowCount(mq);
}

/** {@inheritDoc} */
@Override public Double getRowCount(Sort rel, RelMetadataQuery mq) {
return rel.estimateRowCount(mq);
}

/** */
@Nullable public static Double joinRowCount(RelMetadataQuery mq, Join rel) {
if (!rel.getJoinType().projectsRight()) {
@@ -43,6 +43,12 @@ public class IgniteCost implements RelOptCost {
/** Cost of a lookup at the hash. */
public static final double HASH_LOOKUP_COST = 10;

/** In case the fetch value is a DYNAMIC_PARAM. */
public static final double FETCH_IS_PARAM_FACTOR = 0.01;

/** In case the offset value is a DYNAMIC_PARAM. */
public static final double OFFSET_IS_PARAM_FACTOR = 0.5;

/**
* With broadcast distribution each row will be sent to the each distination node,
* thus the total bytes amount will be multiplies of the destination nodes count.
@@ -30,22 +30,18 @@
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;

import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.FETCH_IS_PARAM_FACTOR;
import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.OFFSET_IS_PARAM_FACTOR;
import static org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;

/** */
public class IgniteLimit extends SingleRel implements IgniteRel {
/** In case the fetch value is a DYNAMIC_PARAM. */
private static final double FETCH_IS_PARAM_FACTOR = 0.01;

/** In case the offset value is a DYNAMIC_PARAM. */
private static final double OFFSET_IS_PARAM_FACTOR = 0.5;

/** Offset. */
private final RexNode offset;

@@ -160,23 +156,6 @@ else if (!requiredCollation.satisfies(relCollation))
return Math.min(lim, inputRowCount - off);
}

/**
* @return Integer value of the literal expression.
*/
private double doubleFromRex(RexNode n, double def) {
try {
if (n.isA(SqlKind.LITERAL))
return ((RexLiteral)n).getValueAs(Integer.class);
else
return def;
}
catch (Exception e) {
assert false : "Unable to extract value: " + e.getMessage();

return def;
}
}

/**
* @return Offset.
*/
@@ -37,31 +37,68 @@
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;

import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.FETCH_IS_PARAM_FACTOR;
import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.OFFSET_IS_PARAM_FACTOR;
import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
import static org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;

/**
* Ignite sort operator.
*/
public class IgniteSort extends Sort implements IgniteRel {
/** */
private final boolean enforcer;

/**
* Constructor.
*
* @param cluster Cluster.
* @param traits Trait set.
* @param child Input node.
* @param collation Collation.
* @param offset Offset.
* @param fetch Limit.
* @param enforcer Enforcer flag.
*/
public IgniteSort(
RelOptCluster cluster,
RelTraitSet traits,
RelNode child,
RelCollation collation,
RexNode offset,
RexNode fetch,
boolean enforcer
) {
super(cluster, traits, child, collation, offset, fetch);

this.enforcer = enforcer;
}

/**
* Constructor.
*
* @param cluster Cluster.
* @param traits Trait set.
* @param child Input node.
* @param collation Collation.
* @param enforcer Enforcer flag.
*/
public IgniteSort(
RelOptCluster cluster,
RelTraitSet traits,
RelNode child,
RelCollation collation) {
super(cluster, traits, child, collation);
RelCollation collation,
boolean enforcer
) {
this(cluster, traits, child, collation, null, null, enforcer);
}

/** */
public IgniteSort(RelInput input) {
super(changeTraits(input, IgniteConvention.INSTANCE));

// No need to enforce anything on ready, fragmented and sent plan.
enforcer = false;
}

/** {@inheritDoc} */
@@ -72,9 +109,7 @@ public IgniteSort(RelInput input) {
RexNode offset,
RexNode fetch
) {
assert offset == null && fetch == null;

return new IgniteSort(getCluster(), traitSet, newInput, newCollation);
return new IgniteSort(getCluster(), traitSet, newInput, traitSet.getCollation(), offset, fetch, enforcer);
}

/** {@inheritDoc} */
@@ -92,9 +127,13 @@ public IgniteSort(RelInput input) {
if (isEnforcer() || required.getConvention() != IgniteConvention.INSTANCE)
return null;

RelCollation collation = TraitUtils.collation(required);
RelCollation requiredCollation = TraitUtils.collation(required);
RelCollation relCollation = traitSet.getCollation();

return Pair.of(required.replace(collation), ImmutableList.of(required.replace(RelCollations.EMPTY)));
if (!requiredCollation.satisfies(relCollation))
return null;

return Pair.of(required, ImmutableList.of(required.replace(RelCollations.EMPTY)));
}

/** {@inheritDoc} */
@@ -107,16 +146,24 @@ public IgniteSort(RelInput input) {
return Pair.of(childTraits.replace(collation()), ImmutableList.of(childTraits));
}

/** {@inheritDoc} */
@Override public double estimateRowCount(RelMetadataQuery mq) {
return memRows(mq.getRowCount(getInput()));
}

/** {@inheritDoc} */
@Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
double rows = mq.getRowCount(getInput());
double inputRows = mq.getRowCount(getInput());

double memRows = memRows(inputRows);

double cpuCost = rows * IgniteCost.ROW_PASS_THROUGH_COST + Util.nLogN(rows) * IgniteCost.ROW_COMPARISON_COST;
double memory = rows * getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
double cpuCost = inputRows * IgniteCost.ROW_PASS_THROUGH_COST + Util.nLogM(inputRows, memRows)
* IgniteCost.ROW_COMPARISON_COST;
double memory = memRows * getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;

IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();

RelOptCost cost = costFactory.makeCost(rows, cpuCost, 0, memory, 0);
RelOptCost cost = costFactory.makeCost(inputRows, cpuCost, 0, memory, 0);

// Distributed sorting is more preferable than sorting on the single node.
if (TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
@@ -127,6 +174,21 @@ public IgniteSort(RelInput input) {

/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation);
return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation, offset, fetch, enforcer);
}

/** {@inheritDoc} */
@Override public boolean isEnforcer() {
return enforcer;
}

/** Rows number to keep in memory and sort. */
private double memRows(double inputRows) {
double fetch = this.fetch != null ? doubleFromRex(this.fetch, inputRows * FETCH_IS_PARAM_FACTOR)
: inputRows;
double offset = this.offset != null ? doubleFromRex(this.offset, inputRows * OFFSET_IS_PARAM_FACTOR)
: 0;

return Math.min(inputRows, fetch + offset);
}
}

0 comments on commit 9f138a4

Please sign in to comment.