Skip to content

Commit

Permalink
IGNITE-18569 Fix planning with MRSortAggregate - Fixes apache#1603.
Browse files Browse the repository at this point in the history
Signed-off-by: zstan <stanilovsky@gmail.com>
  • Loading branch information
zstan authored and lowka committed Mar 18, 2023
1 parent 698dd09 commit 2e48d47
Show file tree
Hide file tree
Showing 19 changed files with 369 additions and 303 deletions.

Large diffs are not rendered by default.

This file was deleted.

Expand Up @@ -712,13 +712,19 @@ public Node<RowT> visit(IgniteMapSortAggregate rel) {

RowFactory<RowT> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);

Comparator<RowT> comp = expressionFactory.comparator(rel.collation());

if (rel.getGroupSet().isEmpty() && comp == null) {
comp = (k1, k2) -> 0;
}

SortAggregateNode<RowT> node = new SortAggregateNode<>(
ctx,
type,
rel.getGroupSet(),
accFactory,
rowFactory,
expressionFactory.comparator(rel.collation())
comp
);

Node<RowT> input = visit(rel.getInput());
Expand All @@ -743,13 +749,19 @@ public Node<RowT> visit(IgniteReduceSortAggregate rel) {

RowFactory<RowT> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);

Comparator<RowT> comp = expressionFactory.comparator(rel.collation());

if (rel.getGroupSet().isEmpty() && comp == null) {
comp = (k1, k2) -> 0;
}

SortAggregateNode<RowT> node = new SortAggregateNode<>(
ctx,
type,
rel.getGroupSet(),
accFactory,
rowFactory,
expressionFactory.comparator(rel.collation())
comp
);

Node<RowT> input = visit(rel.getInput());
Expand Down
Expand Up @@ -66,7 +66,13 @@ public class SortAggregateNode<RowT> extends AbstractNode<RowT> implements Singl

/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*
* @param ctx Execution context.
* @param type Aggregation operation (phase) type.
* @param grpSet Bit set of grouping fields.
* @param accFactory Accumulators.
* @param rowFactory Row factory.
* @param comp Comparator.
*/
public SortAggregateNode(
ExecutionContext<RowT> ctx,
Expand Down
Expand Up @@ -198,7 +198,7 @@ public ColocationGroup colocate(ColocationGroup other) throws ColocationMappingE
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public ColocationGroup finalaze() {
public ColocationGroup complete() {
if (assignments == null && nodeNames == null) {
return this;
}
Expand Down
Expand Up @@ -156,7 +156,7 @@ public FragmentMapping finalize(Supplier<List<String>> nodesSource) {

List<ColocationGroup> colocationGroups = this.colocationGroups;

colocationGroups = Commons.transform(colocationGroups, ColocationGroup::finalaze);
colocationGroups = Commons.transform(colocationGroups, ColocationGroup::complete);

List<String> nodes = nodeNames();
List<String> nodes0 = nodes.isEmpty() ? nodesSource.get() : nodes;
Expand Down
Expand Up @@ -38,7 +38,7 @@
/**
* Relational operator that returns the hashed contents of a table and allow to lookup rows by specified keys.
*/
public class IgniteHashIndexSpool extends AbstractIgniteSpool implements InternalIgniteRel {
public class IgniteHashIndexSpool extends AbstractIgniteSpool {
/** Search row. */
private final List<RexNode> searchRow;

Expand Down
Expand Up @@ -40,7 +40,7 @@
*/
public abstract class IgniteColocatedAggregateBase extends IgniteAggregate implements TraitsAwareIgniteRel {
/** {@inheritDoc} */
protected IgniteColocatedAggregateBase(
IgniteColocatedAggregateBase(
RelOptCluster cluster,
RelTraitSet traitSet,
RelNode input,
Expand All @@ -51,8 +51,12 @@ protected IgniteColocatedAggregateBase(
super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
}

/** {@inheritDoc} */
protected IgniteColocatedAggregateBase(RelInput input) {
/**
* Constructor used for deserialization.
*
* @param input Serialized representation.
*/
IgniteColocatedAggregateBase(RelInput input) {
super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
}

Expand Down
Expand Up @@ -41,6 +41,11 @@ public IgniteColocatedHashAggregate(RelOptCluster cluster, RelTraitSet traitSet,
super(cluster, traitSet, input, groupSet, groupSets, aggCalls);
}

/**
* Constructor used for deserialization.
*
* @param input Serialized representation.
*/
public IgniteColocatedHashAggregate(RelInput input) {
super(input);
}
Expand Down
Expand Up @@ -61,8 +61,9 @@ public IgniteColocatedSortAggregate(
}

/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
* Constructor used for deserialization.
*
* @param input Serialized representation.
*/
public IgniteColocatedSortAggregate(RelInput input) {
super(input);
Expand Down
Expand Up @@ -53,17 +53,6 @@ protected IgniteMapAggregateBase(RelInput input) {
super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
}

/** {@inheritDoc} */
@Override
public Pair<RelTraitSet, List<RelTraitSet>> passThroughDistribution(RelTraitSet nodeTraits,
List<RelTraitSet> inTraits) {
if (TraitUtils.distribution(nodeTraits).satisfies(IgniteDistributions.single())) {
return null;
} else {
return TraitsAwareIgniteRel.super.passThroughDistribution(nodeTraits, inTraits);
}
}

/** {@inheritDoc} */
@Override
public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
Expand Down
Expand Up @@ -58,8 +58,9 @@ public IgniteMapHashAggregate(
}

/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
* Constructor used for deserialization.
*
* @param input Serialized representation.
*/
public IgniteMapHashAggregate(RelInput input) {
super(input);
Expand Down
Expand Up @@ -64,22 +64,21 @@ public IgniteMapSortAggregate(
super(cluster, traitSet, input, groupSet, groupSets, aggCalls);

assert Objects.nonNull(collation);
assert !collation.isDefault();

this.collation = collation;
}

/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
* Constructor used for deserialization.
*
* @param input Serialized representation.
*/
public IgniteMapSortAggregate(RelInput input) {
super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));

collation = input.getCollation();

assert Objects.nonNull(collation);
assert !collation.isDefault();
}

/** {@inheritDoc} */
Expand Down

0 comments on commit 2e48d47

Please sign in to comment.