Skip to content

Commit

Permalink
PHOENIX-2758 Ordered GROUP BY not occurring with leading PK equality …
Browse files Browse the repository at this point in the history
…expression
  • Loading branch information
jtaylor-sfdc committed Apr 1, 2016
1 parent 5d6865e commit 838a60b
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 115 deletions.
Expand Up @@ -33,6 +33,7 @@
import java.util.Properties;

import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.junit.Test;


Expand Down Expand Up @@ -261,4 +262,49 @@ public void testGroupByArray() throws Exception {
assertFalse(rs.next());
conn.close();
}

@Test
public void testGroupByOrderPreserving() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
conn.createStatement().execute("CREATE TABLE T (ORGANIZATION_ID char(15) not null, \n" +
"JOURNEY_ID char(15) not null, \n" +
"DATASOURCE SMALLINT not null, \n" +
"MATCH_STATUS TINYINT not null, \n" +
"EXTERNAL_DATASOURCE_KEY varchar(30), \n" +
"ENTITY_ID char(15) not null, \n" +
"CONSTRAINT PK PRIMARY KEY (\n" +
" ORGANIZATION_ID, \n" +
" JOURNEY_ID, \n" +
" DATASOURCE, \n" +
" MATCH_STATUS,\n" +
" EXTERNAL_DATASOURCE_KEY,\n" +
" ENTITY_ID))");
conn.createStatement().execute("UPSERT INTO T VALUES('000001111122222', '333334444455555', 0, 0, 'abc', '666667777788888')");
conn.createStatement().execute("UPSERT INTO T VALUES('000001111122222', '333334444455555', 0, 0, 'abcd', '666667777788889')");
conn.createStatement().execute("UPSERT INTO T VALUES('000001111122222', '333334444455555', 0, 0, 'abc', '666667777788899')");
conn.commit();
String query =
"SELECT COUNT(1), EXTERNAL_DATASOURCE_KEY As DUP_COUNT\n" +
" FROM T \n" +
" WHERE JOURNEY_ID='333334444455555' AND \n" +
" DATASOURCE=0 AND MATCH_STATUS <= 1 and \n" +
" ORGANIZATION_ID='000001111122222' \n" +
" GROUP BY MATCH_STATUS, EXTERNAL_DATASOURCE_KEY \n" +
" HAVING COUNT(1) > 1";
ResultSet rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals(2,rs.getInt(1));
assertEquals("abc", rs.getString(2));
assertFalse(rs.next());

rs = conn.createStatement().executeQuery("EXPLAIN " + query);
assertEquals(
"CLIENT PARALLEL 1-WAY RANGE SCAN OVER T ['000001111122222','333334444455555',0,*] - ['000001111122222','333334444455555',0,1]\n" +
" SERVER FILTER BY FIRST KEY ONLY\n" +
" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [MATCH_STATUS, EXTERNAL_DATASOURCE_KEY]\n" +
"CLIENT MERGE SORT\n" +
"CLIENT FILTER BY COUNT(1) > 1",QueryUtil.getExplainPlan(rs));
}

}
Expand Up @@ -57,14 +57,33 @@ public class GroupByCompiler {
public static class GroupBy {
private final List<Expression> expressions;
private final List<Expression> keyExpressions;
private final String scanAttribName;
public static final GroupByCompiler.GroupBy EMPTY_GROUP_BY = new GroupBy(new GroupByBuilder());
private final boolean isOrderPreserving;
public static final GroupByCompiler.GroupBy EMPTY_GROUP_BY = new GroupBy(new GroupByBuilder()) {
@Override
public void explain(List<String> planSteps, Integer limit) {
}
@Override
public String getScanAttribName() {
return null;
}
};
public static final GroupByCompiler.GroupBy UNGROUPED_GROUP_BY = new GroupBy(new GroupByBuilder().setIsOrderPreserving(true)) {
@Override
public void explain(List<String> planSteps, Integer limit) {
planSteps.add(" SERVER AGGREGATE INTO SINGLE ROW");
}
@Override
public String getScanAttribName() {
return BaseScannerRegionObserver.UNGROUPED_AGG;
}
};

private GroupBy(GroupByBuilder builder) {
this.expressions = ImmutableList.copyOf(builder.expressions);
this.keyExpressions = ImmutableList.copyOf(builder.keyExpressions);
this.scanAttribName = builder.scanAttribName;
assert(expressions.size() == keyExpressions.size());
this.keyExpressions = builder.expressions == builder.keyExpressions ?
this.expressions : builder.keyExpressions == null ? null :
ImmutableList.copyOf(builder.keyExpressions);
this.isOrderPreserving = builder.isOrderPreserving;
}

public List<Expression> getExpressions() {
Expand All @@ -76,129 +95,47 @@ public List<Expression> getKeyExpressions() {
}

public String getScanAttribName() {
return scanAttribName;
return isOrderPreserving ?
BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS :
BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
}

public boolean isEmpty() {
return expressions.isEmpty();
}

public static class GroupByBuilder {
private String scanAttribName;
private List<Expression> expressions = Collections.emptyList();
private List<Expression> keyExpressions = Collections.emptyList();

public GroupByBuilder() {
}

public GroupByBuilder setScanAttribName(String scanAttribName) {
this.scanAttribName = scanAttribName;
return this;
}

public GroupByBuilder setExpressions(List<Expression> expressions) {
this.expressions = expressions;
return this;
}

public GroupByBuilder setKeyExpressions(List<Expression> keyExpressions) {
this.keyExpressions = keyExpressions;
return this;
}

public GroupBy build() {
return new GroupBy(this);
}
}

public boolean isOrderPreserving() {
return !BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS.equals(scanAttribName);
return isOrderPreserving;
}

public void explain(List<String> planSteps, Integer limit) {
if (scanAttribName != null) {
if (BaseScannerRegionObserver.UNGROUPED_AGG.equals(scanAttribName)) {
planSteps.add(" SERVER AGGREGATE INTO SINGLE ROW");
} else if (BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS.equals(scanAttribName)) {
planSteps.add(" SERVER AGGREGATE INTO DISTINCT ROWS BY " + getExpressions() + (limit == null ? "" : " LIMIT " + limit + " GROUP" + (limit.intValue() == 1 ? "" : "S")));
} else {
planSteps.add(" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY " + getExpressions() + (limit == null ? "" : " LIMIT " + limit + " GROUP" + (limit.intValue() == 1 ? "" : "S")));
public GroupBy compile(StatementContext context, TupleProjector tupleProjector) throws SQLException {
boolean isOrderPreserving = this.isOrderPreserving;
if (isOrderPreserving) {
OrderPreservingTracker tracker = new OrderPreservingTracker(context, GroupBy.EMPTY_GROUP_BY, Ordering.UNORDERED, expressions.size(), tupleProjector);
for (int i = 0; i < expressions.size(); i++) {
Expression expression = expressions.get(i);
tracker.track(expression);
}

// This is true if the GROUP BY is composed of only PK columns. We further check here that
// there are no "gaps" in the PK columns positions used (i.e. we start with the first PK
// column and use each subsequent one in PK order).
isOrderPreserving = tracker.isOrderPreserving();
}
}
}

/**
* Get list of columns in the GROUP BY clause.
* @param context query context kept between compilation of different query clauses
* @param statement SQL statement being compiled
* @return the {@link GroupBy} instance encapsulating the group by clause
* @throws ColumnNotFoundException if column name could not be resolved
* @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
*/
public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector, boolean isInRowKeyOrder) throws SQLException {
List<ParseNode> groupByNodes = statement.getGroupBy();
/**
* Distinct can use an aggregate plan if there's no group by.
* Otherwise, we need to insert a step after the Merge that dedups.
* Order by only allowed on columns in the select distinct
*/
if (groupByNodes.isEmpty()) {
if (statement.isAggregate()) {
return new GroupBy.GroupByBuilder().setScanAttribName(BaseScannerRegionObserver.UNGROUPED_AGG).build();
}
if (!statement.isDistinct()) {
return GroupBy.EMPTY_GROUP_BY;
if (isOrderPreserving) {
return this;
}

groupByNodes = Lists.newArrayListWithExpectedSize(statement.getSelect().size());
for (AliasedNode aliasedNode : statement.getSelect()) {
groupByNodes.add(aliasedNode.getNode());
}
}

// Accumulate expressions in GROUP BY
ExpressionCompiler compiler =
new ExpressionCompiler(context, GroupBy.EMPTY_GROUP_BY);
List<Pair<Integer,Expression>> groupBys = Lists.newArrayListWithExpectedSize(groupByNodes.size());
OrderPreservingTracker tracker = new OrderPreservingTracker(context, GroupBy.EMPTY_GROUP_BY, Ordering.UNORDERED, groupByNodes.size(), tupleProjector);
for (int i = 0; i < groupByNodes.size(); i++) {
ParseNode node = groupByNodes.get(i);
Expression expression = node.accept(compiler);
if (!expression.isStateless()) {
if (compiler.isAggregate()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_IN_GROUP_BY)
.setMessage(expression.toString()).build().buildException();
}
tracker.track(expression);
List<Expression> expressions = Lists.newArrayListWithExpectedSize(this.expressions.size());
List<Expression> keyExpressions = expressions;
List<Pair<Integer,Expression>> groupBys = Lists.newArrayListWithExpectedSize(this.expressions.size());
for (int i = 0; i < this.expressions.size(); i++) {
Expression expression = this.expressions.get(i);
groupBys.add(new Pair<Integer,Expression>(i,expression));
}
compiler.reset();
}

if (groupBys.isEmpty()) {
return GroupBy.EMPTY_GROUP_BY;
}

boolean isRowKeyOrderedGrouping = isInRowKeyOrder && tracker.isOrderPreserving();
List<Expression> expressions = Lists.newArrayListWithExpectedSize(groupBys.size());
List<Expression> keyExpressions = expressions;
String groupExprAttribName;
// This is true if the GROUP BY is composed of only PK columns. We further check here that
// there are no "gaps" in the PK columns positions used (i.e. we start with the first PK
// column and use each subsequent one in PK order).
if (isRowKeyOrderedGrouping) {
groupExprAttribName = BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS;
for (Pair<Integer,Expression> groupBy : groupBys) {
expressions.add(groupBy.getSecond());
}
} else {
/*
* Otherwise, our coprocessor needs to collect all distinct groups within a region, sort them, and
* hold on to them until the scan completes.
*/
groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
/*
* If we're not ordered along the PK axis, our coprocessor needs to collect all distinct groups within
* a region, sort them, and hold on to them until the scan completes.
* Put fixed length nullables at the end, so that we can represent null by the absence of the trailing
* value in the group by key. If there is more than one, we'll need to convert the ones not at the end
* into a Decimal so that we can use an empty byte array as our representation for null (which correctly
Expand Down Expand Up @@ -276,9 +213,99 @@ public int compare(Pair<Integer,Expression> gb1, Pair<Integer,Expression> gb2) {
// than one fixed and nullable types are used in a group by clause
keyExpressions.set(i, CoerceExpression.create(expression, keyType));
}

GroupBy groupBy = new GroupBy.GroupByBuilder().setIsOrderPreserving(isOrderPreserving).setExpressions(expressions).setKeyExpressions(keyExpressions).build();
return groupBy;
}

public static class GroupByBuilder {
private boolean isOrderPreserving;
private List<Expression> expressions = Collections.emptyList();
private List<Expression> keyExpressions = Collections.emptyList();

public GroupByBuilder() {
}

public GroupByBuilder setExpressions(List<Expression> expressions) {
this.expressions = expressions;
return this;
}

public GroupByBuilder setKeyExpressions(List<Expression> keyExpressions) {
this.keyExpressions = keyExpressions;
return this;
}

public GroupByBuilder setIsOrderPreserving(boolean isOrderPreserving) {
this.isOrderPreserving = isOrderPreserving;
return this;
}

public GroupBy build() {
return new GroupBy(this);
}
}

public void explain(List<String> planSteps, Integer limit) {
if (isOrderPreserving) {
planSteps.add(" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY " + getExpressions() + (limit == null ? "" : " LIMIT " + limit + " GROUP" + (limit.intValue() == 1 ? "" : "S")));
} else {
planSteps.add(" SERVER AGGREGATE INTO DISTINCT ROWS BY " + getExpressions() + (limit == null ? "" : " LIMIT " + limit + " GROUP" + (limit.intValue() == 1 ? "" : "S")));
}
}
}

/**
* Get list of columns in the GROUP BY clause.
* @param context query context kept between compilation of different query clauses
* @param statement SQL statement being compiled
* @return the {@link GroupBy} instance encapsulating the group by clause
* @throws ColumnNotFoundException if column name could not be resolved
* @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
*/
public static GroupBy compile(StatementContext context, SelectStatement statement, boolean isOrderPreserving) throws SQLException {
List<ParseNode> groupByNodes = statement.getGroupBy();
/**
* Distinct can use an aggregate plan if there's no group by.
* Otherwise, we need to insert a step after the Merge that dedups.
* Order by only allowed on columns in the select distinct
*/
if (groupByNodes.isEmpty()) {
if (statement.isAggregate()) {
return GroupBy.UNGROUPED_GROUP_BY;
}
if (!statement.isDistinct()) {
return GroupBy.EMPTY_GROUP_BY;
}

groupByNodes = Lists.newArrayListWithExpectedSize(statement.getSelect().size());
for (AliasedNode aliasedNode : statement.getSelect()) {
groupByNodes.add(aliasedNode.getNode());
}
}

GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(expressions).setKeyExpressions(keyExpressions).build();
// Accumulate expressions in GROUP BY
ExpressionCompiler compiler =
new ExpressionCompiler(context, GroupBy.EMPTY_GROUP_BY);
List<Expression> expressions = Lists.newArrayListWithExpectedSize(groupByNodes.size());
for (int i = 0; i < groupByNodes.size(); i++) {
ParseNode node = groupByNodes.get(i);
Expression expression = node.accept(compiler);
if (!expression.isStateless()) {
if (compiler.isAggregate()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_IN_GROUP_BY)
.setMessage(expression.toString()).build().buildException();
}
expressions.add(expression);
}
compiler.reset();
}

if (expressions.isEmpty()) {
return GroupBy.EMPTY_GROUP_BY;
}

GroupBy groupBy = new GroupBy.GroupByBuilder().setIsOrderPreserving(isOrderPreserving).setExpressions(expressions).setKeyExpressions(expressions).build();
return groupBy;
}

Expand Down
Expand Up @@ -530,7 +530,7 @@ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectState
}
Integer limit = LimitCompiler.compile(context, select);

GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector, isInRowKeyOrder);
GroupBy groupBy = GroupByCompiler.compile(context, select, isInRowKeyOrder);
// Optimize the HAVING clause by finding any group by expressions that can be moved
// to the WHERE clause
select = HavingCompiler.rewrite(context, select, groupBy);
Expand All @@ -542,6 +542,9 @@ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectState
}
Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet();
Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries);
// Recompile GROUP BY now that we've figured out our ScanRanges so we know
// definitively whether or not we'll traverse in row key order.
groupBy = groupBy.compile(context, innerPlanTupleProjector);
context.setResolver(resolver); // recover resolver
RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns, where);
OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, projector, groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder);
Expand Down

0 comments on commit 838a60b

Please sign in to comment.