Skip to content

Commit

Permalink
Merge be31078 into 27a569d
Browse files Browse the repository at this point in the history
  • Loading branch information
mfelsche committed Mar 17, 2015
2 parents 27a569d + be31078 commit 23c4033
Show file tree
Hide file tree
Showing 19 changed files with 508 additions and 66 deletions.
1 change: 0 additions & 1 deletion sql/src/main/java/io/crate/analyze/OrderBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

package io.crate.analyze;

import com.google.common.collect.ImmutableList;
import io.crate.planner.symbol.Symbol;

import java.util.List;
Expand Down
14 changes: 12 additions & 2 deletions sql/src/main/java/io/crate/operation/merge/MergeOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,18 @@ public PageDownstream getAndInitPageDownstream(MergeNode mergeNode,
ResultProvider resultProvider,
RamAccountingContext ramAccountingContext,
Optional<Executor> executorOptional) {
BucketMerger bucketMerger = new NonSortingBucketMerger(executorOptional);

BucketMerger bucketMerger;
if (mergeNode.sortedInputOutput()) {
bucketMerger = new SortingBucketMerger(
mergeNode.numUpstreams(),
mergeNode.orderByIndices(),
mergeNode.reverseFlags(),
mergeNode.nullsFirst(),
executorOptional
);
} else {
bucketMerger = new NonSortingBucketMerger(executorOptional);
}
FlatProjectorChain flatProjectorChain = new FlatProjectorChain(mergeNode.projections(),
this.projectionToProjectorVisitor,
ramAccountingContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.carrotsearch.hppc.cursors.IntCursor;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
Expand Down Expand Up @@ -71,7 +72,7 @@ public SortingBucketMerger(int numBuckets,
this.executor = executor;
List<Comparator<Row>> comparators = new ArrayList<>(orderByPositions.length);
for (int i = 0; i < orderByPositions.length; i++) {
comparators.add(OrderingByPosition.rowOrdering(i, reverseFlags[i], nullsFirst[i]));
comparators.add(OrderingByPosition.rowOrdering(orderByPositions[i], reverseFlags[i], nullsFirst[i]));
}
ordering = Ordering.compound(comparators);
wantMore = new AtomicBoolean(true);
Expand Down Expand Up @@ -319,18 +320,25 @@ private boolean emit(Row row) {

@Override
public void finish() {
ArrayList<Iterator<Row>> bucketIts = new ArrayList<>(remainingBucketIts.length);
for (Iterator<Row> bucketIt : remainingBucketIts) {
if (bucketIt == null) {
bucketIts.add(Collections.<Row>emptyIterator());
} else {
bucketIts.add(bucketIt);
while (hasReminaingBucketIts()) {
ArrayList<Iterator<Row>> bucketIts = new ArrayList<>(remainingBucketIts.length);
for (Iterator<Row> bucketIt : remainingBucketIts) {
if (bucketIt == null) {
bucketIts.add(Collections.<Row>emptyIterator());
} else {
bucketIts.add(bucketIt);
}
}
emitBuckets(bucketIts);
}
emitBuckets(bucketIts);
downstream.finish();
}

@SuppressWarnings("unchecked")
private boolean hasReminaingBucketIts() {
return Iterators.any(Iterators.forArray(remainingBucketIts), Predicates.notNull());
}

@Override
public void fail(Throwable e) {
downstream.fail(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public int compare(@Nullable Object[] left, @Nullable Object[] right) {
protected final int position;
protected final Ordering<Comparable> ordering;

private OrderingByPosition (int position, boolean reverse, Boolean nullFirst) {
private OrderingByPosition (int position, boolean reverse, @Nullable Boolean nullFirst) {
this.position = position;

// note, that we are reverse for the queue so this conditional is by intent
Expand Down
43 changes: 41 additions & 2 deletions sql/src/main/java/io/crate/planner/PlanNodeBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@

package io.crate.planner;

import com.google.common.base.MoreObjects;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.crate.analyze.OrderBy;
import io.crate.analyze.WhereClause;
import io.crate.metadata.PartitionName;
import io.crate.metadata.Routing;
import io.crate.metadata.table.TableInfo;
import io.crate.planner.consumer.OrderByPositionVisitor;
import io.crate.planner.node.dql.CollectNode;
import io.crate.planner.node.dql.DQLPlanNode;
import io.crate.planner.node.dql.MergeNode;
Expand All @@ -38,7 +41,10 @@
import io.crate.planner.symbol.Symbols;

import javax.annotation.Nullable;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class PlanNodeBuilder {

Expand Down Expand Up @@ -71,13 +77,46 @@ public static MergeNode distributedMerge(CollectNode collectNode,
}

public static MergeNode localMerge(List<Projection> projections,
DQLPlanNode previousNode) {
DQLPlanNode previousNode) {
MergeNode node = new MergeNode("localMerge", previousNode.executionNodes().size());
node.projections(projections);
connectTypes(previousNode, node);
return node;
}

/**
* Create a MergeNode which uses a {@link io.crate.operation.merge.SortingBucketMerger}
* as it expects sorted input and produces sorted output.
*
* @param projections the projections to include in the resulting MergeNode
* @param orderBy {@linkplain io.crate.analyze.OrderBy} containing sorting parameters
* @param sourceSymbols the input symbols for this mergeNode
* @param orderBySymbols the symbols to sort on. If this is null,
* {@linkplain io.crate.analyze.OrderBy#orderBySymbols()}
* will be used
* @param previousNode the previous planNode to derive inputtypes from
*/
public static MergeNode sortedLocalMerge(List<Projection> projections,
OrderBy orderBy,
List<Symbol> sourceSymbols,
@Nullable List<Symbol> orderBySymbols,
DQLPlanNode previousNode) {
int[] orderByIndices = OrderByPositionVisitor.orderByPositions(
MoreObjects.firstNonNull(orderBySymbols, orderBy.orderBySymbols()),
sourceSymbols
);
MergeNode node = MergeNode.sortedMergeNode(
"sortedLocalMerge",
previousNode.executionNodes().size(),
orderByIndices,
orderBy.reverseFlags(),
orderBy.nullsFirst()
);
node.projections(projections);
connectTypes(previousNode, node);
return node;
}

/**
* calculates the outputTypes using the projections and input types.
* must be called after projections have been set.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.planner.consumer;

import com.carrotsearch.hppc.IntArrayList;
import io.crate.planner.symbol.*;
import org.elasticsearch.common.inject.Singleton;

import java.util.List;

/**
* Extract 0-based integer positions for order by symbols.
*
* This can only be used under the following restriction:
* <ul>
* <li>if an <code>orderBySymbol</code> is no input column with explicit index,
* it must be part of <code>sourceSymbols</code>.
*/
@Singleton
public class OrderByPositionVisitor extends SymbolVisitor<OrderByPositionVisitor.Context, Void> {

private static OrderByPositionVisitor INSTANCE = new OrderByPositionVisitor();

public static class Context {
final List<Symbol> sourceSymbols;
IntArrayList orderByPositions;

public Context(List<Symbol> sourceSymbols) {
this.sourceSymbols = sourceSymbols;
this.orderByPositions = new IntArrayList();
}

public int[] orderByPositions() {
return orderByPositions.toArray();
}
}

private OrderByPositionVisitor() {
}

public static int[] orderByPositions(List<Symbol> orderBySymbols, List<Symbol> sourceSymbols) {
Context context = new Context(sourceSymbols);
for (Symbol orderBySymbol : orderBySymbols) {
INSTANCE.process(orderBySymbol, context);
}
return context.orderByPositions();
}

@Override
public Void visitInputColumn(InputColumn inputColumn, Context context) {
context.orderByPositions.add(inputColumn.index());
return null;
}

@Override
protected Void visitSymbol(Symbol symbol, Context context) {
int idx = context.sourceSymbols.indexOf(symbol);
if (idx >= 0) {
context.orderByPositions.add(idx);
} else {
throw new IllegalArgumentException(SymbolFormatter.format("Cannot sort by: %s - not part of source symbols", symbol));
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,16 +207,23 @@ private AnalyzedRelation normalSelect(QueriedTable table,
tnp.outputs(allOutputs);
collectNode = PlanNodeBuilder.collect(tableInfo, whereClause, toCollect, ImmutableList.<Projection>of(tnp));

// MERGE
tnp = new TopNProjection(limit, querySpec.offset());
tnp.outputs(finalOutputs);
if (orderBy == null) {
tnp = new TopNProjection(limit, querySpec.offset());
// no sorting needed
mergeNode = PlanNodeBuilder.localMerge(ImmutableList.<Projection>of(tnp), collectNode);
} else {
tnp = new TopNProjection(limit, querySpec.offset(),
// no order by needed in TopN as we already sorted on collector
// and we merge sorted with SortedBucketMerger
mergeNode = PlanNodeBuilder.sortedLocalMerge(
ImmutableList.<Projection>of(tnp),
orderBy,
allOutputs,
orderByInputColumns,
orderBy.reverseFlags(),
orderBy.nullsFirst());
collectNode
);
}
tnp.outputs(finalOutputs);
mergeNode = PlanNodeBuilder.localMerge(ImmutableList.<Projection>of(tnp), collectNode);
} else {
collectNode = PlanNodeBuilder.collect(tableInfo, whereClause, outputSymbols, ImmutableList.<Projection>of());
mergeNode = PlanNodeBuilder.localMerge(ImmutableList.<Projection>of(), collectNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ private AnalyzedRelation optimizedReduceOnCollectorGroupBy(QueriedTable table, T
projections.add(groupProjection);

HavingClause havingClause = table.querySpec().having();
if(havingClause != null){
if (havingClause != null) {
if (havingClause.noMatch()) {
return new NoopPlannedAnalyzedRelation(table);
} else if (havingClause.hasQuery()){
} else if (havingClause.hasQuery()) {
FilterProjection fp = projectionBuilder.filterProjection(
collectOutputs,
havingClause.query()
Expand All @@ -180,7 +180,7 @@ private AnalyzedRelation optimizedReduceOnCollectorGroupBy(QueriedTable table, T
collectOutputs.containsAll(table.querySpec().outputs());
boolean collectorTopN = table.querySpec().limit() != null || table.querySpec().offset() > 0 || !outputsMatch;

if(collectorTopN) {
if (collectorTopN) {
projections.add(projectionBuilder.topNProjection(
collectOutputs,
orderBy,
Expand All @@ -196,24 +196,36 @@ private AnalyzedRelation optimizedReduceOnCollectorGroupBy(QueriedTable table, T
splitPoints.leaves(),
ImmutableList.copyOf(projections)
);

// handler
List<Projection> handlerProjections = new ArrayList<>();
if (!ignoreSorting) {
List<Symbol> inputs;
if(collectorTopN){
inputs = table.querySpec().outputs();
} else {
inputs = collectOutputs;
}
handlerProjections.add(projectionBuilder.topNProjection(
inputs,
orderBy,
table.querySpec().offset(),
firstNonNull(table.querySpec().limit(), Constants.DEFAULT_SELECT_LIMIT),
table.querySpec().outputs()
));
MergeNode localMergeNode;
if (!ignoreSorting && collectorTopN && orderBy != null && orderBy.isSorted()) {
// handler receives sorted results from collectnodes
// we can do the sorting with a sorting bucket merger
localMergeNode = PlanNodeBuilder.sortedLocalMerge(handlerProjections, orderBy, table.querySpec().outputs(), null, collectNode);
handlerProjections.add(
projectionBuilder.topNProjection(
table.querySpec().outputs(),
null, // omit order by
table.querySpec().offset(),
firstNonNull(table.querySpec().limit(), Constants.DEFAULT_SELECT_LIMIT),
table.querySpec().outputs()
)
);
} else {
handlerProjections.add(
projectionBuilder.topNProjection(
collectorTopN ? table.querySpec().outputs() : collectOutputs,
orderBy,
table.querySpec().offset(),
firstNonNull(table.querySpec().limit(), Constants.DEFAULT_SELECT_LIMIT),
table.querySpec().outputs()
)
);
// fallback - unsorted local merge
localMergeNode = PlanNodeBuilder.localMerge(handlerProjections, collectNode);
}
MergeNode localMergeNode = PlanNodeBuilder.localMerge(handlerProjections, collectNode);
return new NonDistributedGroupBy(collectNode, localMergeNode);
}

Expand Down
Loading

0 comments on commit 23c4033

Please sign in to comment.