Skip to content

Commit

Permalink
fixup! set nodePageSizeHint on collectPhase in CrossJoinConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Oct 13, 2015
1 parent 3307ed0 commit 74d9d44
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 28 deletions.
14 changes: 14 additions & 0 deletions sql/src/main/java/io/crate/planner/consumer/ConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class ConsumerContext {

private ValidationException validationException;

private Integer requiredPageSize;

public ConsumerContext(AnalyzedRelation rootRelation, Planner.Context plannerContext) {
this.rootRelation = rootRelation;
this.plannerContext = plannerContext;
Expand All @@ -55,4 +57,16 @@ public ValidationException validationException() {
public Planner.Context plannerContext() {
return plannerContext;
}


public void requiredPageSize(int requiredPageSize) {
this.requiredPageSize = requiredPageSize;
}

/**
* required pageSize that a parent relation might have specified.
*/
public Integer requiredPageSize() {
return requiredPageSize;
}
}
29 changes: 4 additions & 25 deletions sql/src/main/java/io/crate/planner/consumer/CrossJoinConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@
import io.crate.analyze.relations.*;
import io.crate.exceptions.ValidationException;
import io.crate.metadata.OutputName;
import io.crate.operation.Paging;
import io.crate.operation.projectors.TopN;
import io.crate.planner.PlanVisitor;
import io.crate.planner.fetch.FetchRequiredVisitor;
import io.crate.planner.node.NoopPlannedAnalyzedRelation;
import io.crate.planner.node.dql.CollectAndMerge;
import io.crate.planner.node.dql.DQLPlanNode;
import io.crate.planner.node.dql.MergePhase;
import io.crate.planner.node.dql.join.NestedLoop;
Expand All @@ -54,7 +51,6 @@
public class CrossJoinConsumer implements Consumer {

private final Visitor visitor;
private final static PageSizePushDown PAGE_SIZE_PUSH_DOWN = new PageSizePushDown();

public CrossJoinConsumer(ClusterService clusterService,
AnalysisMetaData analysisMetaData) {
Expand Down Expand Up @@ -126,11 +122,13 @@ public PlannedAnalyzedRelation visitMultiSourceSelect(MultiSourceSelect statemen
QueriedTableRelation<?> left = queriedTables.get(0);
QueriedTableRelation<?> right = queriedTables.get(1);

Integer limit = statement.querySpec().limit();
if (limit != null) {
context.requiredPageSize(limit + statement.querySpec().offset());
}
PlannedAnalyzedRelation leftPlan = context.plannerContext().planSubRelation(left, context);
PlannedAnalyzedRelation rightPlan = context.plannerContext().planSubRelation(right, context);

pushDownPageSize(statement, leftPlan, rightPlan);

Set<String> localExecutionNodes = ImmutableSet.of(clusterService.localNode().id());

MergePhase leftMerge = mergePhase(
Expand Down Expand Up @@ -175,15 +173,6 @@ public PlannedAnalyzedRelation visitMultiSourceSelect(MultiSourceSelect statemen
return new NestedLoop(nl, leftPlan, rightPlan, true);
}

private void pushDownPageSize(MultiSourceSelect statement, PlannedAnalyzedRelation leftPlan, PlannedAnalyzedRelation rightPlan) {
Integer limit = statement.querySpec().limit();
if (limit != null) {
// NL plan is always push based, so it is possible reduce the nodePageSizeHint
PAGE_SIZE_PUSH_DOWN.process(leftPlan.plan(), limit + statement.querySpec().offset());
PAGE_SIZE_PUSH_DOWN.process(rightPlan.plan(), limit + statement.querySpec().offset());
}
}

@Nullable
private MergePhase mergePhase(ConsumerContext context,
Set<String> localExecutionNodes,
Expand Down Expand Up @@ -304,16 +293,6 @@ private boolean hasOutputsToFetch(QuerySpec querySpec) {

}

private static class PageSizePushDown extends PlanVisitor<Integer, Void> {

@Override
public Void visitCollectAndMerge(CollectAndMerge plan, Integer limitAndOffset) {
plan.collectPhase().nodePageSizeHint(
Paging.getWeightedPageSize(limitAndOffset, 1.0d / Math.max(1, plan.collectPhase().executionNodes().size())));
return null;
}
}

private static class SubRelationConverter extends AnalyzedRelationVisitor<MultiSourceSelect, QueriedTableRelation> {

private static final QueriedTableFactory<QueriedTable, TableRelation> QUERIED_TABLE_FACTORY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ private PlannedAnalyzedRelation normalSelect(QueriedTableRelation table,
collectPhase);
}
}

if (context.requiredPageSize() != null) {
collectPhase.pageSizeHint(context.requiredPageSize());
}
SimpleSelect.enablePagingIfApplicable(
collectPhase, mergeNode, querySpec.limit(), querySpec.offset(), plannerContext.clusterService().localNode().id());
return new CollectAndMerge(collectPhase, mergeNode, plannerContext.jobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public PlannedAnalyzedRelation visitQueriedDocTable(QueriedDocTable table, Consu
);
}

if (context.requiredPageSize() != null) {
collectPhase.pageSizeHint(context.requiredPageSize());
}
SimpleSelect.enablePagingIfApplicable(
collectPhase, localMergePhase, querySpec.limit(), querySpec.offset(), plannerContext.clusterService().localNode().id());
return new QueryThenFetch(collectPhase, fetchPhase, localMergePhase, context.plannerContext().jobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void enablePagingIfApplicable(CollectPhase collectPhase,
if (collectPhase.nodePageSizeHint() != null) {
// in the directResponse case nodePageSizeHint has probably be set to limit
// since it is now push based we can reduce the nodePageSizeHint
collectPhase.nodePageSizeHint(Paging.getWeightedPageSize(limit + offset, 1.0d / collectPhase.executionNodes().size()));
collectPhase.pageSizeHint(limit + offset);
}
}
}
14 changes: 12 additions & 2 deletions sql/src/main/java/io/crate/planner/node/dql/CollectPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,18 @@ public void distributionInfo(DistributionInfo distributionInfo) {
* NOTE: if the collectPhase provides a directResult (instead of push result) the nodePageSizeHint has to be set
* to the query hard-limit because there is no way to fetch more rows.
*/
public void nodePageSizeHint(Integer nodePageSize) {
this.nodePageSizeHint = nodePageSize;
public void nodePageSizeHint(Integer nodePageSizeHint) {
this.nodePageSizeHint = nodePageSizeHint;
}


/**
* Similar to {@link #nodePageSizeHint(Integer)} in that it sets the nodePageSizeHint, but the given
* pageSize is the total pageSize.
*
*/
public void pageSizeHint(Integer pageSize) {
nodePageSizeHint(Paging.getWeightedPageSize(pageSize, 1.0d / Math.max(1, executionNodes().size())));
}

/**
Expand Down

0 comments on commit 74d9d44

Please sign in to comment.