Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set nodePageSizeHint on collectPhase in CrossJoinConsumer #2724

Merged
merged 1 commit into from
Oct 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sql/src/main/java/io/crate/planner/consumer/ConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class ConsumerContext {

private ValidationException validationException;

private Integer requiredPageSize;

public ConsumerContext(AnalyzedRelation rootRelation, Planner.Context plannerContext) {
this.rootRelation = rootRelation;
this.plannerContext = plannerContext;
Expand All @@ -55,4 +57,16 @@ public ValidationException validationException() {
public Planner.Context plannerContext() {
return plannerContext;
}


public void requiredPageSize(Integer requiredPageSize) {
this.requiredPageSize = requiredPageSize;
}

/**
* required pageSize that a parent relation might have specified.
*/
public Integer requiredPageSize() {
return requiredPageSize;
}
}
14 changes: 10 additions & 4 deletions sql/src/main/java/io/crate/planner/consumer/CrossJoinConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import io.crate.planner.projection.Projection;
import io.crate.planner.projection.TopNProjection;
import io.crate.planner.projection.builder.ProjectionBuilder;
import io.crate.planner.symbol.*;
import io.crate.planner.symbol.Field;
import io.crate.planner.symbol.Literal;
import io.crate.planner.symbol.Symbol;
import io.crate.sql.tree.QualifiedName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -117,11 +119,16 @@ public PlannedAnalyzedRelation visitMultiSourceSelect(MultiSourceSelect statemen
sortQueriedTables(relationOrder, queriedTables);


QueriedTableRelation left = queriedTables.get(0);
QueriedTableRelation right = queriedTables.get(1);
QueriedTableRelation<?> left = queriedTables.get(0);
QueriedTableRelation<?> right = queriedTables.get(1);

Integer limit = statement.querySpec().limit();
if (limit != null) {
context.requiredPageSize(limit + statement.querySpec().offset());
}
PlannedAnalyzedRelation leftPlan = context.plannerContext().planSubRelation(left, context);
PlannedAnalyzedRelation rightPlan = context.plannerContext().planSubRelation(right, context);
context.requiredPageSize(null);

Set<String> localExecutionNodes = ImmutableSet.of(clusterService.localNode().id());

Expand Down Expand Up @@ -167,7 +174,6 @@ public PlannedAnalyzedRelation visitMultiSourceSelect(MultiSourceSelect statemen
return new NestedLoop(nl, leftPlan, rightPlan, true);
}


@Nullable
private MergePhase mergePhase(ConsumerContext context,
Set<String> localExecutionNodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ private PlannedAnalyzedRelation normalSelect(QueriedTableRelation table,
collectPhase);
}
}

if (context.requiredPageSize() != null) {
collectPhase.pageSizeHint(context.requiredPageSize());
}
SimpleSelect.enablePagingIfApplicable(
collectPhase, mergeNode, querySpec.limit(), querySpec.offset(), plannerContext.clusterService().localNode().id());
return new CollectAndMerge(collectPhase, mergeNode, plannerContext.jobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public PlannedAnalyzedRelation visitQueriedDocTable(QueriedDocTable table, Consu
);
}

if (context.requiredPageSize() != null) {
collectPhase.pageSizeHint(context.requiredPageSize());
}
SimpleSelect.enablePagingIfApplicable(
collectPhase, localMergePhase, querySpec.limit(), querySpec.offset(), plannerContext.clusterService().localNode().id());
return new QueryThenFetch(collectPhase, fetchPhase, localMergePhase, context.plannerContext().jobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void enablePagingIfApplicable(CollectPhase collectPhase,
if (collectPhase.nodePageSizeHint() != null) {
// in the directResponse case nodePageSizeHint has probably be set to limit
// since it is now push based we can reduce the nodePageSizeHint
collectPhase.nodePageSizeHint(Paging.getWeightedPageSize(limit + offset, 1.0d / collectPhase.executionNodes().size()));
collectPhase.pageSizeHint(limit + offset);
}
}
}
14 changes: 12 additions & 2 deletions sql/src/main/java/io/crate/planner/node/dql/CollectPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,18 @@ public void distributionInfo(DistributionInfo distributionInfo) {
* NOTE: if the collectPhase provides a directResult (instead of push result) the nodePageSizeHint has to be set
* to the query hard-limit because there is no way to fetch more rows.
*/
public void nodePageSizeHint(Integer nodePageSize) {
this.nodePageSizeHint = nodePageSize;
public void nodePageSizeHint(Integer nodePageSizeHint) {
this.nodePageSizeHint = nodePageSizeHint;
}


/**
* Similar to {@link #nodePageSizeHint(Integer)} in that it sets the nodePageSizeHint, but the given
* pageSize is the total pageSize.
*
*/
public void pageSizeHint(Integer pageSize) {
nodePageSizeHint(Paging.getWeightedPageSize(pageSize, 1.0d / Math.max(1, executionNodes().size())));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ public void testOrderByPushDown() throws Exception {
assertThat(topNProjection.limit(), is(Constants.DEFAULT_SELECT_LIMIT));
}

@Test
public void testNodePageSizePushDown() throws Exception {
NestedLoop plan = plan("select u1.name from users u1, users u2 order by 1 limit 1000");
CollectPhase cpL = ((CollectAndMerge) plan.left().plan()).collectPhase();
assertThat(cpL.nodePageSizeHint(), is(750));

CollectPhase cpR = ((CollectAndMerge) plan.right().plan()).collectPhase();
assertThat(cpR.nodePageSizeHint(), is(750));
}

@Test
public void testCrossJoinWithGroupBy() throws Exception {
expectedException.expect(ValidationException.class);
Expand Down