Skip to content

Commit

Permalink
set nodePageSizeHint on collectPhase in CrossJoinConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Oct 13, 2015
1 parent 5517c70 commit 5533cd1
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 7 deletions.
14 changes: 14 additions & 0 deletions sql/src/main/java/io/crate/planner/consumer/ConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class ConsumerContext {

private ValidationException validationException;

private Integer requiredPageSize;

public ConsumerContext(AnalyzedRelation rootRelation, Planner.Context plannerContext) {
this.rootRelation = rootRelation;
this.plannerContext = plannerContext;
Expand All @@ -55,4 +57,16 @@ public ValidationException validationException() {
public Planner.Context plannerContext() {
return plannerContext;
}


public void requiredPageSize(Integer requiredPageSize) {
this.requiredPageSize = requiredPageSize;
}

/**
* required pageSize that a parent relation might have specified.
*/
public Integer requiredPageSize() {
return requiredPageSize;
}
}
14 changes: 10 additions & 4 deletions sql/src/main/java/io/crate/planner/consumer/CrossJoinConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import io.crate.planner.projection.Projection;
import io.crate.planner.projection.TopNProjection;
import io.crate.planner.projection.builder.ProjectionBuilder;
import io.crate.planner.symbol.*;
import io.crate.planner.symbol.Field;
import io.crate.planner.symbol.Literal;
import io.crate.planner.symbol.Symbol;
import io.crate.sql.tree.QualifiedName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -117,11 +119,16 @@ public PlannedAnalyzedRelation visitMultiSourceSelect(MultiSourceSelect statemen
sortQueriedTables(relationOrder, queriedTables);


QueriedTableRelation left = queriedTables.get(0);
QueriedTableRelation right = queriedTables.get(1);
QueriedTableRelation<?> left = queriedTables.get(0);
QueriedTableRelation<?> right = queriedTables.get(1);

Integer limit = statement.querySpec().limit();
if (limit != null) {
context.requiredPageSize(limit + statement.querySpec().offset());
}
PlannedAnalyzedRelation leftPlan = context.plannerContext().planSubRelation(left, context);
PlannedAnalyzedRelation rightPlan = context.plannerContext().planSubRelation(right, context);
context.requiredPageSize(null);

Set<String> localExecutionNodes = ImmutableSet.of(clusterService.localNode().id());

Expand Down Expand Up @@ -167,7 +174,6 @@ public PlannedAnalyzedRelation visitMultiSourceSelect(MultiSourceSelect statemen
return new NestedLoop(nl, leftPlan, rightPlan, true);
}


@Nullable
private MergePhase mergePhase(ConsumerContext context,
Set<String> localExecutionNodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ private PlannedAnalyzedRelation normalSelect(QueriedTableRelation table,
collectPhase);
}
}

if (context.requiredPageSize() != null) {
collectPhase.pageSizeHint(context.requiredPageSize());
}
SimpleSelect.enablePagingIfApplicable(
collectPhase, mergeNode, querySpec.limit(), querySpec.offset(), plannerContext.clusterService().localNode().id());
return new CollectAndMerge(collectPhase, mergeNode, plannerContext.jobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public PlannedAnalyzedRelation visitQueriedDocTable(QueriedDocTable table, Consu
);
}

if (context.requiredPageSize() != null) {
collectPhase.pageSizeHint(context.requiredPageSize());
}
SimpleSelect.enablePagingIfApplicable(
collectPhase, localMergePhase, querySpec.limit(), querySpec.offset(), plannerContext.clusterService().localNode().id());
return new QueryThenFetch(collectPhase, fetchPhase, localMergePhase, context.plannerContext().jobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void enablePagingIfApplicable(CollectPhase collectPhase,
if (collectPhase.nodePageSizeHint() != null) {
// in the directResponse case nodePageSizeHint has probably be set to limit
// since it is now push based we can reduce the nodePageSizeHint
collectPhase.nodePageSizeHint(Paging.getWeightedPageSize(limit + offset, 1.0d / collectPhase.executionNodes().size()));
collectPhase.pageSizeHint(limit + offset);
}
}
}
14 changes: 12 additions & 2 deletions sql/src/main/java/io/crate/planner/node/dql/CollectPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,18 @@ public void distributionInfo(DistributionInfo distributionInfo) {
* NOTE: if the collectPhase provides a directResult (instead of push result) the nodePageSizeHint has to be set
* to the query hard-limit because there is no way to fetch more rows.
*/
public void nodePageSizeHint(Integer nodePageSize) {
this.nodePageSizeHint = nodePageSize;
public void nodePageSizeHint(Integer nodePageSizeHint) {
this.nodePageSizeHint = nodePageSizeHint;
}


/**
* Similar to {@link #nodePageSizeHint(Integer)} in that it sets the nodePageSizeHint, but the given
* pageSize is the total pageSize.
*
*/
public void pageSizeHint(Integer pageSize) {
nodePageSizeHint(Paging.getWeightedPageSize(pageSize, 1.0d / Math.max(1, executionNodes().size())));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ public void testOrderByPushDown() throws Exception {
assertThat(topNProjection.limit(), is(Constants.DEFAULT_SELECT_LIMIT));
}

@Test
public void testNodePageSizePushDown() throws Exception {
NestedLoop plan = plan("select u1.name from users u1, users u2 order by 1 limit 1000");
CollectPhase cpL = ((CollectAndMerge) plan.left().plan()).collectPhase();
assertThat(cpL.nodePageSizeHint(), is(750));

CollectPhase cpR = ((CollectAndMerge) plan.right().plan()).collectPhase();
assertThat(cpR.nodePageSizeHint(), is(750));
}

@Test
public void testCrossJoinWithGroupBy() throws Exception {
expectedException.expect(ValidationException.class);
Expand Down

0 comments on commit 5533cd1

Please sign in to comment.