Skip to content

Commit

Permalink
Analytic plan optimization: taking advantage of the hash partitioning…
Browse files Browse the repository at this point in the history
… of the preceding aggregation.

- determine the partition group that has maximal intersection of its partition exprs with the
  preceding grouping exprs
- if that intersection's expected ndv > #nodes, make that partition group the first one in the sequence
  to be computed and reduce the hash partition of the preceding aggregation to that intersection

Change-Id: I612b4a260a8975deb495e5d34c32f03db4a7cca7
Reviewed-on: http://gerrit.sjc.cloudera.com:8080/4451
Reviewed-by: Marcel Kornacker <marcel@cloudera.com>
Tested-by: jenkins
  • Loading branch information
Marcel Kornacker authored and Nong Li committed Sep 23, 2014
1 parent 87e9c77 commit 0b3124a
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 33 deletions.
Expand Up @@ -96,13 +96,19 @@ public enum AggPhase {
protected final ExprSubstitutionMap outputToIntermediateTupleSmap_ =
new ExprSubstitutionMap();

// if set, a subset of groupingExprs_; set and used during planning
private List<Expr> partitionExprs_;

// C'tor creates copies of groupingExprs and aggExprs.
private AggregateInfo(ArrayList<Expr> groupingExprs,
ArrayList<FunctionCallExpr> aggExprs, AggPhase aggPhase) {
super(groupingExprs, aggExprs);
aggPhase_ = aggPhase;
}

public List<Expr> getPartitionExprs() { return partitionExprs_; }
public void setPartitionExprs(List<Expr> exprs) { partitionExprs_ = exprs; }

/**
* Creates complete AggregateInfo for groupingExprs and aggExprs, including
* aggTupleDesc and aggTupleSMap. If parameter tupleDesc != null, sets aggTupleDesc to
Expand Down
18 changes: 18 additions & 0 deletions fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java
Expand Up @@ -241,6 +241,9 @@ private static class GlobalState {
// only visible at the root Analyzer
private final Map<SlotId, EquivalenceClassId> equivClassBySlotId = Maps.newHashMap();

// map for each slot to the canonical slot of its equivalence class
private final ExprSubstitutionMap equivClassSmap = new ExprSubstitutionMap();

// represents the direct and transitive value transfers between slots
private ValueTransferGraph valueTransferGraph;

Expand Down Expand Up @@ -1430,6 +1433,19 @@ public void computeEquivClasses() {
}
} while (merged);

// populate equivClassSmap
for (EquivalenceClassId id: globalState_.equivClassMembers.keySet()) {
List<SlotId> members = globalState_.equivClassMembers.get(id);
if (members.isEmpty()) continue;
SlotDescriptor canonicalSlotDesc =
globalState_.descTbl.getSlotDesc(members.get(0));
for (SlotId slotId: globalState_.equivClassMembers.get(id)) {
SlotDescriptor slotDesc = globalState_.descTbl.getSlotDesc(slotId);
globalState_.equivClassSmap.put(
new SlotRef(slotDesc), new SlotRef(canonicalSlotDesc));
}
}

// populate equivClassBySlotId
for (EquivalenceClassId id: globalState_.equivClassMembers.keySet()) {
for (SlotId slotId: globalState_.equivClassMembers.get(id)) {
Expand Down Expand Up @@ -1459,6 +1475,8 @@ public EquivalenceClassId getEquivClassId(SlotId slotId) {
return globalState_.equivClassBySlotId.get(slotId);
}

public ExprSubstitutionMap getEquivClassSmap() { return globalState_.equivClassSmap; }

/**
* Returns true if l1 and l2 only contain SlotRefs, and for all SlotRefs in l1 there
* is an equivalent SlotRef in l2, and vice versa.
Expand Down
38 changes: 38 additions & 0 deletions fe/src/main/java/com/cloudera/impala/analysis/Expr.java
Expand Up @@ -487,6 +487,7 @@ protected void treeToThriftHelper(TExpr container) {
* the exprs have an invalid number of distinct values.
*/
public static long getNumDistinctValues(List<Expr> exprs) {
if (exprs == null || exprs.isEmpty()) return 0;
long numDistinctValues = 1;
for (Expr expr: exprs) {
if (expr.getNumDistinctValues() == -1) {
Expand Down Expand Up @@ -594,6 +595,43 @@ public static <C extends Expr> boolean isSubset(List<C> l1, List<C> l2) {
return l2.containsAll(l1);
}

/**
* Return the intersection of l1 and l2.599
*/
public static <C extends Expr> List<C> intersect(List<C> l1, List<C> l2) {
List<C> result = new ArrayList<C>();
for (C element: l1) {
if (l2.contains(element)) result.add(element);
}
return result;
}

/**
* Compute the intersection of l1 and l2, given the smap, and
* return the intersecting l1 elements in i1 and the intersecting l2 elements in i2.
*/
public static void intersect(Analyzer analyzer,
List<Expr> l1, List<Expr> l2, ExprSubstitutionMap smap,
List<Expr> i1, List<Expr> i2) {
i1.clear();
i2.clear();
List<Expr> s1List = Expr.substituteList(l1, smap, analyzer);
Preconditions.checkState(s1List.size() == l1.size());
List<Expr> s2List = Expr.substituteList(l2, smap, analyzer);
Preconditions.checkState(s2List.size() == l2.size());
for (int i = 0; i < s1List.size(); ++i) {
Expr s1 = s1List.get(i);
for (int j = 0; j < s2List.size(); ++j) {
Expr s2 = s2List.get(j);
if (s1.equals(s2)) {
i1.add(l1.get(i));
i2.add(l2.get(j));
break;
}
}
}
}

@Override
public int hashCode() {
if (id_ == null) {
Expand Down
125 changes: 111 additions & 14 deletions fe/src/main/java/com/cloudera/impala/planner/AnalyticPlanner.java
Expand Up @@ -95,13 +95,20 @@ public AnalyticPlanner(List<TupleId> stmtTupleIds,
}

/**
* Augment planFragment with plan nodes that implement single-node evaluation of
* the AnalyticExprs in analyticInfo.
* TODO: create partition groups that are not based on set identity: the subset
* of partition exprs common to the partition group should be large enough to
* parallelize across all machines
* Return plan tree that augments 'root' with plan nodes that implement single-node
* evaluation of the AnalyticExprs in analyticInfo.
* This plan takes into account a possible hash partition of its input on
* 'groupingExprs'; if this is non-null, it returns in 'inputPartitionExprs'
* a subset of the grouping exprs which should be used for the aggregate
* hash partitioning during the parallelization of 'root'.
* TODO: when generating sort orders for the sort groups, optimize the ordering
* of the partition exprs (so that subsequent sort operations see the input sorted
* on a prefix of their required sort exprs)
* TODO: when merging sort groups, recognize equivalent exprs
* (using the equivalence classes) rather than looking for expr equality
*/
public PlanNode createSingleNodePlan(PlanNode root) throws ImpalaException {
public PlanNode createSingleNodePlan(PlanNode root,
List<Expr> groupingExprs, List<Expr> inputPartitionExprs) throws ImpalaException {
List<WindowGroup> windowGroups = collectWindowGroups();
for (WindowGroup g: windowGroups) {
g.init(analyzer_);
Expand All @@ -112,11 +119,18 @@ public PlanNode createSingleNodePlan(PlanNode root) throws ImpalaException {
g.init();
}
List<PartitionGroup> partitionGroups = collectPartitionGroups(sortGroups);
mergePartitionGroups(partitionGroups, root.getNumNodes());
orderGroups(partitionGroups);
if (groupingExprs != null) {
Preconditions.checkNotNull(inputPartitionExprs);
computeInputPartitionExprs(
partitionGroups, groupingExprs, root.getNumNodes(), inputPartitionExprs);
}

for (PartitionGroup partitionGroup: partitionGroups) {
for (int i = 0; i < partitionGroup.sortGroups.size(); ++i) {
root = createSortGroupPlan(root, partitionGroup.sortGroups.get(i), i == 0);
root = createSortGroupPlan(root, partitionGroup.sortGroups.get(i),
i == 0 ? partitionGroup.partitionByExprs : null);
}
}

Expand Down Expand Up @@ -148,6 +162,75 @@ private void mergeSortGroups(List<SortGroup> sortGroups) {
} while (hasMerged);
}

/**
* Coalesce partition groups for which the intersection of their
* partition exprs has ndv estimate > numNodes, so that the resulting plan
* still parallelizes across all nodes.
*/
private void mergePartitionGroups(
List<PartitionGroup> partitionGroups, int numNodes) {
boolean hasMerged = false;
do {
hasMerged = false;
for (PartitionGroup pg1: partitionGroups) {
for (PartitionGroup pg2: partitionGroups) {
if (pg1 != pg2) {
long ndv = Expr.getNumDistinctValues(
Expr.intersect(pg1.partitionByExprs, pg2.partitionByExprs));
if (ndv == -1 || ndv < 0 || ndv < numNodes) {
// didn't get a usable value or the number of partitions is too small
continue;
}
pg1.merge(pg2);
partitionGroups.remove(pg2);
hasMerged = true;
break;
}
}
if (hasMerged) break;
}
} while (hasMerged);
}

/**
* Determine the partition group that has the maximum intersection in terms
* of the estimated ndv of the partition exprs with groupingExprs.
* That partition group is placed at the front of partitionGroups, with its
* partition exprs reduced to the intersection, and the intersecting groupingExprs
* are returned in inputPartitionExprs.
*/
private void computeInputPartitionExprs(List<PartitionGroup> partitionGroups,
List<Expr> groupingExprs, int numNodes, List<Expr> inputPartitionExprs) {
inputPartitionExprs.clear();
// find partition group with maximum intersection
long maxNdv = 0;
PartitionGroup maxPg = null;
List<Expr> maxGroupingExprs = null;
for (PartitionGroup pg: partitionGroups) {
List<Expr> l1 = Lists.newArrayList();
List<Expr> l2 = Lists.newArrayList();
Expr.intersect(analyzer_, pg.partitionByExprs, groupingExprs,
analyzer_.getEquivClassSmap(), l1, l2);
// TODO: also look at l2 and take the max?
long ndv = Expr.getNumDistinctValues(l1);
if (ndv < 0 || ndv < numNodes || ndv < maxNdv) continue;
// found a better partition group
maxPg = pg;
maxPg.partitionByExprs = l1;
maxGroupingExprs = l2;
maxNdv = ndv;
}

if (maxNdv > numNodes) {
Preconditions.checkNotNull(maxPg);
// we found a partition group that gives us enough parallelism;
// move it to the front
partitionGroups.remove(maxPg);
partitionGroups.add(0, maxPg);
inputPartitionExprs.addAll(maxGroupingExprs);
}
}

/**
* Order partition groups (and the sort groups within them) by increasing
* totalOutputTupleSize. This minimizes the total volume of data that needs to be
Expand Down Expand Up @@ -216,10 +299,12 @@ private SortInfo createSortInfo(

/**
* Create plan tree for the entire sort group, including all contained window groups.
* Marks the SortNode as requiring its input to be partitioned if isFirstInPartition.
* Marks the SortNode as requiring its input to be partitioned if partitionExprs
* is not null (partitionExprs represent the data partition of the entire partition
* group of which this sort group is a part).
*/
private PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup,
boolean isFirstInPartition) throws ImpalaException {
List<Expr> partitionExprs) throws ImpalaException {
List<Expr> partitionByExprs = sortGroup.partitionByExprs;
List<OrderByElement> orderByElements = sortGroup.orderByElements;
ExprSubstitutionMap sortSmap = null;
Expand All @@ -234,7 +319,8 @@ private PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup,
List<Expr> sortExprs = Lists.newArrayList(partitionByExprs);
List<Boolean> isAsc =
Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));
// TODO: should nulls come first or last?
// TODO: utilize a direction and nulls/first last that has benefit
// for subsequent sort groups
List<Boolean> nullsFirst =
Lists.newArrayList(Collections.nCopies(sortExprs.size(), new Boolean(true)));

Expand All @@ -253,12 +339,12 @@ private PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup,
// to be executed like a regular distributed sort
if (!partitionByExprs.isEmpty()) sortNode.setIsAnalyticSort(true);

if (isFirstInPartition) {
if (partitionExprs != null) {
// create required input partition
DataPartition inputPartition = DataPartition.UNPARTITIONED;
if (!partitionByExprs.isEmpty()) {
inputPartition = new DataPartition(TPartitionType.HASH_PARTITIONED,
partitionByExprs);
if (!partitionExprs.isEmpty()) {
inputPartition =
new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
}
sortNode.setInputPartition(inputPartition);
}
Expand Down Expand Up @@ -702,6 +788,17 @@ public void add(SortGroup sortGroup) {
totalOutputTupleSize += sortGroup.totalOutputTupleSize;
}

/**
* Merge 'other' into 'this'
* - partitionByExprs is the intersection of the two
* - sortGroups becomes the union
*/
public void merge(PartitionGroup other) {
partitionByExprs = Expr.intersect(partitionByExprs, other.partitionByExprs);
Preconditions.checkState(Expr.getNumDistinctValues(partitionByExprs) >= 0);
sortGroups.addAll(other.sortGroups);
}

/**
* Order sort groups by increasing totalOutputTupleSize. This minimizes the total
* volume of data that needs to be sorted.
Expand Down
Expand Up @@ -119,7 +119,8 @@ public String getExplainString(String prefix, String detailPrefix,
if (totalNumPartitions == -1) {
output.append(detailPrefix + "partitions=unavailable");
} else {
output.append(detailPrefix + "partitions=" + totalNumPartitions);
output.append(detailPrefix + "partitions="
+ (totalNumPartitions == 0 ? 1 : totalNumPartitions));
}
output.append("\n");
if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
Expand Down
24 changes: 18 additions & 6 deletions fe/src/main/java/com/cloudera/impala/planner/Planner.java
Expand Up @@ -95,6 +95,12 @@ public class Planner {
* Create plan fragments for an analyzed statement, given a set of execution options.
* The fragments are returned in a list such that element i of that list can
* only consume output of the following fragments j > i.
*
* TODO: take data partition of the plan fragments into account; in particular,
* coordinate between hash partitioning for aggregation and hash partitioning
* for analytic computation more generally than what createQueryPlan() does
* right now (the coordination only happens if the same select block does both
* the aggregation and analytic computation).
*/
public ArrayList<PlanFragment> createPlanFragments(
AnalysisContext.AnalysisResult analysisResult, TQueryOptions queryOptions)
Expand Down Expand Up @@ -782,11 +788,10 @@ private PlanFragment createAggregationFragment(AggregationNode node,
if (hasGrouping) {
// the parent fragment is partitioned on the grouping exprs;
// substitute grouping exprs to reference the *output* of the agg, not the input
// TODO: add infrastructure so that all PlanNodes have smaps to make this
// process of turning exprs into executable exprs less ad-hoc; might even want to
// introduce another mechanism that simply records a mapping of slots
List<Expr> partitionExprs = Expr.substituteList(
groupingExprs, node.getAggInfo().getIntermediateSmap(), analyzer);
List<Expr> partitionExprs = node.getAggInfo().getPartitionExprs();
if (partitionExprs == null) partitionExprs = groupingExprs;
partitionExprs = Expr.substituteList(
partitionExprs, node.getAggInfo().getIntermediateSmap(), analyzer);
parentPartition =
new DataPartition(TPartitionType.HASH_PARTITIONED, partitionExprs);
} else {
Expand Down Expand Up @@ -1004,7 +1009,14 @@ private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, boolean disa
stmt.getMaterializedTupleIds(stmtTupleIds);
AnalyticPlanner analyticPlanner =
new AnalyticPlanner(stmtTupleIds, analyticInfo, analyzer, nodeIdGenerator_);
root = analyticPlanner.createSingleNodePlan(root);
List<Expr> inputPartitionExprs = Lists.newArrayList();
AggregateInfo aggInfo = ((SelectStmt) stmt).getAggInfo();
root = analyticPlanner.createSingleNodePlan(root,
aggInfo != null ? aggInfo.getGroupingExprs() : null, inputPartitionExprs);
if (aggInfo != null && !inputPartitionExprs.isEmpty()) {
// analytic computation will benefit from a partition on inputPartitionExprs
aggInfo.setPartitionExprs(inputPartitionExprs);
}
}
} else {
Preconditions.checkState(stmt instanceof UnionStmt);
Expand Down

0 comments on commit 0b3124a

Please sign in to comment.