Skip to content

Commit

Permalink
use ProjectionBuilder for NonDistributedGroupBy
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Bogensberger committed Feb 10, 2015
1 parent 74ebcf6 commit 1808c0e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.crate.analyze.relations.AnalyzedRelationVisitor;
import io.crate.analyze.relations.TableRelation;
import io.crate.analyze.where.WhereClauseAnalyzer;
import io.crate.analyze.where.WhereClauseContext;
import io.crate.exceptions.VersionInvalidException;
import io.crate.metadata.ColumnIdent;
import io.crate.metadata.Functions;
Expand Down Expand Up @@ -152,7 +151,7 @@ private AnalyzedRelation groupBy(QueriedTable table, TableRelation tableRelation
@Nullable ColumnIndexWriterProjection indexWriterProjection, @Nullable Functions functions){
TableInfo tableInfo = tableRelation.tableInfo();
if (tableInfo.schemaInfo().systemSchema() || !GroupByConsumer.requiresDistribution(tableInfo, tableInfo.getRouting(table.querySpec().where(), null))) {
return NonDistributedGroupByConsumer.nonDistributedGroupBy(table, whereClause, indexWriterProjection);
return NonDistributedGroupByConsumer.nonDistributedGroupBy(table, indexWriterProjection);
} else if (groupedByClusteredColumnOrPrimaryKeys(table, tableRelation)) {
return ReduceOnCollectorGroupByConsumer.optimizedReduceOnCollectorGroupBy(table, tableRelation, whereClause, indexWriterProjection);
} else if (indexWriterProjection != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*/
package io.crate.planner.consumer;

import io.crate.Constants;
import com.google.common.collect.ImmutableList;
import io.crate.analyze.*;
import io.crate.analyze.relations.AnalyzedRelation;
import io.crate.analyze.relations.AnalyzedRelationVisitor;
Expand All @@ -29,20 +29,23 @@
import io.crate.metadata.Routing;
import io.crate.metadata.table.TableInfo;
import io.crate.planner.PlanNodeBuilder;
import io.crate.planner.PlannerContextBuilder;
import io.crate.planner.node.NoopPlannedAnalyzedRelation;
import io.crate.planner.node.dql.CollectNode;
import io.crate.planner.node.dql.GroupByConsumer;
import io.crate.planner.node.dql.MergeNode;
import io.crate.planner.node.dql.NonDistributedGroupBy;
import io.crate.planner.projection.*;
import io.crate.planner.projection.ColumnIndexWriterProjection;
import io.crate.planner.projection.GroupProjection;
import io.crate.planner.projection.Projection;
import io.crate.planner.projection.builder.ProjectionBuilder;
import io.crate.planner.projection.builder.SplitPoints;
import io.crate.planner.symbol.Aggregation;
import io.crate.planner.symbol.Symbol;
import org.elasticsearch.common.Nullable;

import java.util.ArrayList;
import java.util.List;

import static com.google.common.base.MoreObjects.firstNonNull;

public class NonDistributedGroupByConsumer implements Consumer {

private final Visitor visitor;
Expand Down Expand Up @@ -100,7 +103,7 @@ public AnalyzedRelation visitQueriedTable(QueriedTable table, Context context) {
}

context.result = true;
return nonDistributedGroupBy(table, whereClause, null);
return nonDistributedGroupBy(table, null);
}

@Override
Expand All @@ -124,70 +127,75 @@ protected AnalyzedRelation visitAnalyzedRelation(AnalyzedRelation relation, Cont
* LocalMerge ( GroupProjection PARTIAL -> FINAL, [FilterProjection], [TopN], IndexWriterProjection )
*/
public static AnalyzedRelation nonDistributedGroupBy(QueriedTable table,
WhereClause whereClause,
@Nullable ColumnIndexWriterProjection indexWriterProjection) {
boolean ignoreSorting = indexWriterProjection != null;
TableInfo tableInfo = table.tableRelation().tableInfo();

GroupByConsumer.validateGroupBySymbols(table.tableRelation(), table.querySpec().groupBy());
List<Symbol> groupBy = table.tableRelation().resolve(table.querySpec().groupBy());
int numAggregationSteps = 2;
List<Symbol> groupBy = table.querySpec().groupBy();

PlannerContextBuilder contextBuilder =
new PlannerContextBuilder(numAggregationSteps, groupBy, ignoreSorting)
.output(table.tableRelation().resolve(table.querySpec().outputs()))
.orderBy(table.tableRelation().resolveAndValidateOrderBy(table.querySpec().orderBy()));
ProjectionBuilder projectionBuilder = new ProjectionBuilder(table.querySpec());
SplitPoints splitPoints = projectionBuilder.getSplitPoints();

HavingClause havingClause = table.querySpec().having();
Symbol havingQuery = null;
if (havingClause != null) {
if (havingClause.noMatch()) {
return new NoopPlannedAnalyzedRelation(table);
} else if (havingClause.hasQuery()){
havingQuery = contextBuilder.having(havingClause.query());
}
}
// mapper / collect
GroupProjection groupProjection =
new GroupProjection(contextBuilder.groupBy(), contextBuilder.aggregations());
contextBuilder.addProjection(groupProjection);
GroupProjection groupProjection = projectionBuilder.groupProjection(
splitPoints.leaves(),
table.querySpec().groupBy(),
splitPoints.aggregates(),
Aggregation.Step.ITER,
Aggregation.Step.PARTIAL);

CollectNode collectNode = PlanNodeBuilder.collect(
tableInfo,
whereClause,
contextBuilder.toCollect(),
contextBuilder.getAndClearProjections()
table.querySpec().where(),
splitPoints.leaves(),
ImmutableList.<Projection>of(groupProjection)
);

// handler
contextBuilder.nextStep();
Projection handlerGroupProjection = new GroupProjection(contextBuilder.groupBy(), contextBuilder.aggregations());
contextBuilder.addProjection(handlerGroupProjection);
if (havingQuery != null) {
FilterProjection fp = new FilterProjection(havingQuery);
fp.outputs(contextBuilder.genInputColumns(handlerGroupProjection.outputs(), handlerGroupProjection.outputs().size()));
contextBuilder.addProjection(fp);
List<Symbol> collectOutputs = new ArrayList<>(
groupBy.size() +
splitPoints.aggregates().size());
collectOutputs.addAll(groupBy);
collectOutputs.addAll(splitPoints.aggregates());


OrderBy orderBy = table.querySpec().orderBy();
if (orderBy != null) {
table.tableRelation().validateOrderBy(orderBy);
}
if (!ignoreSorting) {
OrderBy orderBy = table.querySpec().orderBy();
int limit = firstNonNull(table.querySpec().limit(), Constants.DEFAULT_SELECT_LIMIT);
TopNProjection topN;
if (orderBy == null) {
topN = new TopNProjection(limit, table.querySpec().offset());

} else {
topN = new TopNProjection(limit, table.querySpec().offset(),
contextBuilder.orderBy(),
orderBy.reverseFlags(),
orderBy.nullsFirst());

List<Projection> projections = new ArrayList<>();
projections.add(projectionBuilder.groupProjection(
collectOutputs,
table.querySpec().groupBy(),
splitPoints.aggregates(),
Aggregation.Step.PARTIAL,
Aggregation.Step.FINAL
));

HavingClause havingClause = table.querySpec().having();
if (havingClause != null) {
if (havingClause.noMatch()) {
return new NoopPlannedAnalyzedRelation(table);
} else if (havingClause.hasQuery()){
projections.add(projectionBuilder.filterProjection(
collectOutputs,
havingClause.query()
));
}
topN.outputs(contextBuilder.outputs());
contextBuilder.addProjection(topN);
}
if (indexWriterProjection != null) {
contextBuilder.addProjection(indexWriterProjection);

if (indexWriterProjection != null){
projections.add(indexWriterProjection);
} else {
projections.add(projectionBuilder.topNProjection(
collectOutputs,
orderBy,
table.querySpec().offset(),
table.querySpec().limit(),
table.querySpec().outputs()
));
}
MergeNode localMergeNode = PlanNodeBuilder.localMerge(contextBuilder.getAndClearProjections(), collectNode);
MergeNode localMergeNode = PlanNodeBuilder.localMerge(projections, collectNode);
return new NonDistributedGroupBy(collectNode, localMergeNode);
}

Expand Down

0 comments on commit 1808c0e

Please sign in to comment.