Skip to content

Commit

Permalink
use ProjectionBuilder in InsertFromSubQueryConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Bogensberger committed Feb 11, 2015
1 parent b85965a commit b8c4ff9
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package io.crate.planner.consumer;


import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.crate.Constants;
import io.crate.analyze.*;
Expand All @@ -35,18 +36,24 @@
import io.crate.metadata.Routing;
import io.crate.metadata.table.TableInfo;
import io.crate.planner.PlanNodeBuilder;
import io.crate.planner.PlannerContextBuilder;
import io.crate.planner.node.NoopPlannedAnalyzedRelation;
import io.crate.planner.node.dql.CollectNode;
import io.crate.planner.node.dql.DistributedGroupBy;
import io.crate.planner.node.dql.GroupByConsumer;
import io.crate.planner.node.dql.MergeNode;
import io.crate.planner.projection.*;
import io.crate.planner.projection.ColumnIndexWriterProjection;
import io.crate.planner.projection.GroupProjection;
import io.crate.planner.projection.Projection;
import io.crate.planner.projection.builder.ProjectionBuilder;
import io.crate.planner.projection.builder.SplitPoints;
import io.crate.planner.symbol.Aggregation;
import io.crate.planner.symbol.Reference;
import io.crate.planner.symbol.Symbol;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.ImmutableSettings;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import static com.google.common.base.MoreObjects.firstNonNull;
Expand Down Expand Up @@ -153,7 +160,7 @@ private AnalyzedRelation groupBy(QueriedTable table, TableRelation tableRelation
if (tableInfo.schemaInfo().systemSchema() || !GroupByConsumer.requiresDistribution(tableInfo, tableInfo.getRouting(table.querySpec().where(), null))) {
return NonDistributedGroupByConsumer.nonDistributedGroupBy(table, indexWriterProjection);
} else if (groupedByClusteredColumnOrPrimaryKeys(table, tableRelation)) {
return ReduceOnCollectorGroupByConsumer.optimizedReduceOnCollectorGroupBy(table, tableRelation, whereClause, indexWriterProjection);
return ReduceOnCollectorGroupByConsumer.optimizedReduceOnCollectorGroupBy(table, tableRelation, indexWriterProjection);
} else if (indexWriterProjection != null) {
return distributedWriterGroupBy(table, tableRelation, whereClause, indexWriterProjection, functions);
} else {
Expand All @@ -179,108 +186,94 @@ private static AnalyzedRelation distributedWriterGroupBy(QueriedTable table,
WhereClause whereClause,
Projection writerProjection,
Functions functions) {
boolean ignoreSorting = !table.querySpec().isLimited();
GroupByConsumer.validateGroupBySymbols(tableRelation, table.querySpec().groupBy());
List<Symbol> groupBy = tableRelation.resolve(table.querySpec().groupBy());
List<Symbol> groupBy = table.querySpec().groupBy();

tableRelation.validateOrderBy(table.querySpec().orderBy());

PlannerContextBuilder contextBuilder = new PlannerContextBuilder(2, groupBy, ignoreSorting)
.output(tableRelation.resolve(table.querySpec().outputs()))
.orderBy(tableRelation.resolveAndValidateOrderBy(table.querySpec().orderBy()));
ProjectionBuilder projectionBuilder = new ProjectionBuilder(table.querySpec());
SplitPoints splitPoints = projectionBuilder.getSplitPoints();

HavingClause havingClause = table.querySpec().having();
Symbol havingQuery = null;
if(havingClause != null){
if (havingClause.noMatch()) {
return new NoopPlannedAnalyzedRelation(table);
} else if (havingClause.hasQuery()){
havingQuery = contextBuilder.having(havingClause.query());
}
}
GroupProjection groupProjection = projectionBuilder.groupProjection(
splitPoints.leaves(),
table.querySpec().groupBy(),
splitPoints.aggregates(),
Aggregation.Step.ITER,
Aggregation.Step.PARTIAL);
TableInfo tableInfo = tableRelation.tableInfo();
Routing routing = tableInfo.getRouting(whereClause, null);

// collector
contextBuilder.addProjection(new GroupProjection(
contextBuilder.groupBy(), contextBuilder.aggregations()));
CollectNode collectNode = PlanNodeBuilder.distributingCollect(
tableInfo,
whereClause,
contextBuilder.toCollect(),
table.querySpec().where(),
splitPoints.leaves(),
Lists.newArrayList(routing.nodes()),
contextBuilder.getAndClearProjections()
ImmutableList.<Projection>of(groupProjection)
);

contextBuilder.nextStep();

// mergeNode for reducer

contextBuilder.addProjection(new GroupProjection(
contextBuilder.groupBy(),
contextBuilder.aggregations()));
// start: Reducer
List<Symbol> collectOutputs = new ArrayList<>(
groupBy.size() +
splitPoints.aggregates().size());
collectOutputs.addAll(groupBy);
collectOutputs.addAll(splitPoints.aggregates());

List<Projection> reducerProjections = new LinkedList<>();
reducerProjections.add(projectionBuilder.groupProjection(
collectOutputs,
table.querySpec().groupBy(),
splitPoints.aggregates(),
Aggregation.Step.PARTIAL,
Aggregation.Step.FINAL));

OrderBy orderBy = table.querySpec().orderBy();
if (orderBy != null) {
table.tableRelation().validateOrderBy(orderBy);
}

if (havingQuery != null) {
FilterProjection fp = new FilterProjection(havingQuery);
fp.outputs(contextBuilder.genInputColumns(collectNode.finalProjection().get().outputs(), table.querySpec().outputs().size()));
contextBuilder.addProjection(fp);
HavingClause havingClause = table.querySpec().having();
if(havingClause != null){
if (havingClause.noMatch()) {
return new NoopPlannedAnalyzedRelation(table);
} else if (havingClause.hasQuery()){
reducerProjections.add(projectionBuilder.filterProjection(
collectOutputs,
havingClause.query()
));
}
}

boolean topNDone = false;
OrderBy orderBy = table.querySpec().orderBy();
if (table.querySpec().isLimited()) {
topNDone = true;
TopNProjection topN;
int limit = firstNonNull(table.querySpec().limit(), Constants.DEFAULT_SELECT_LIMIT) + table.querySpec().offset();
if (orderBy == null) {
topN = new TopNProjection(limit, 0);
} else {
topN = new TopNProjection(limit, 0,
tableRelation.resolveAndValidateOrderBy(table.querySpec().orderBy()),
orderBy.reverseFlags(),
orderBy.nullsFirst()
);
}
topN.outputs(contextBuilder.outputs());
contextBuilder.addProjection((topN));
reducerProjections.add(projectionBuilder.topNProjection(
collectOutputs,
orderBy,
0,
limit,
table.querySpec().outputs()
));
} else {
contextBuilder.addProjection((writerProjection));
reducerProjections.add(writerProjection);
}

MergeNode mergeNode = PlanNodeBuilder.distributedMerge(collectNode, contextBuilder.getAndClearProjections());
MergeNode mergeNode = PlanNodeBuilder.distributedMerge(collectNode, reducerProjections);

// local merge on handler
List<Projection> handlerProjections = new ArrayList<>();
if (table.querySpec().isLimited()) {
List<Symbol> outputs;
List<Symbol> orderBySymbols;
if (topNDone) {
orderBySymbols = contextBuilder.passThroughOrderBy();
outputs = contextBuilder.passThroughOutputs();
} else {
orderBySymbols = contextBuilder.orderBy();
outputs = contextBuilder.outputs();
}
// mergeNode handler
TopNProjection topN;
int limit = firstNonNull(table.querySpec().limit(), Constants.DEFAULT_SELECT_LIMIT);
if (orderBy == null) {
topN = new TopNProjection(limit, table.querySpec().offset());
} else {
topN = new TopNProjection(limit, table.querySpec().offset(),
orderBySymbols,
orderBy.reverseFlags(),
orderBy.nullsFirst()
);
}
topN.outputs(outputs);
contextBuilder.addProjection(topN);
contextBuilder.addProjection(writerProjection);
handlerProjections.add(projectionBuilder.topNProjection(
table.querySpec().outputs(),
orderBy,
table.querySpec().offset(),
firstNonNull(table.querySpec().limit(), Constants.DEFAULT_SELECT_LIMIT),
table.querySpec().outputs()
));
handlerProjections.add(writerProjection);

} else {
// sum up distributed indexWriter results
contextBuilder.addProjection(QueryAndFetchConsumer.localMergeProjection(functions));
handlerProjections.add(QueryAndFetchConsumer.localMergeProjection(functions));
}
MergeNode localMergeNode = PlanNodeBuilder.localMerge(contextBuilder.getAndClearProjections(), mergeNode);
MergeNode localMergeNode = PlanNodeBuilder.localMerge(handlerProjections, mergeNode);
return new DistributedGroupBy(
collectNode,
mergeNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ public AnalyzedRelation visitQueriedTable(QueriedTable table, Context context) {
return table;
}
context.result = true;
return ReduceOnCollectorGroupByConsumer.optimizedReduceOnCollectorGroupBy(table, table.tableRelation(),
whereClause, null);
return ReduceOnCollectorGroupByConsumer.optimizedReduceOnCollectorGroupBy(table, table.tableRelation(), null);
}


Expand All @@ -130,7 +129,7 @@ protected AnalyzedRelation visitAnalyzedRelation(AnalyzedRelation relation, Cont
* CollectNode ( GroupProjection, [FilterProjection], [TopN] )
* LocalMergeNode ( [TopN], IndexWriterProjection )
*/
public static AnalyzedRelation optimizedReduceOnCollectorGroupBy(QueriedTable table, TableRelation tableRelation, WhereClause whereClause, ColumnIndexWriterProjection indexWriterProjection) {
public static AnalyzedRelation optimizedReduceOnCollectorGroupBy(QueriedTable table, TableRelation tableRelation, ColumnIndexWriterProjection indexWriterProjection) {
assert GroupByConsumer.groupedByClusteredColumnOrPrimaryKeys(tableRelation, table.querySpec().groupBy()) : "not grouped by clustered column or primary keys";
TableInfo tableInfo = tableRelation.tableInfo();
GroupByConsumer.validateGroupBySymbols(tableRelation, table.querySpec().groupBy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,4 +538,20 @@ public void testInsertFromQueryWithAggregateWithinScalarFunction() throws Except
assertThat((int)response.rows()[3][1], is(4));
}

@Test
public void testInsertFromSubQueryDistributedGroupBy() throws Exception {
this.setup.setUpCharacters();
waitNoPendingTasksOnAll();
execute("create table t (id int, name string)");
ensureGreen();
execute("insert into t (id, name) (select id, name from characters group by id, name)");
assertThat(response.rowCount(), is(4L));
refresh();
execute("select id, name from t order by id");
assertThat(response.rowCount(), is(4L));
assertThat((int)response.rows()[3][0], is(4));
assertThat((String)response.rows()[3][1], is("Arthur"));

}

}

0 comments on commit b8c4ff9

Please sign in to comment.