From 838a60b9abfb9b65363db9e51cdb6bc32d2088c9 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Fri, 1 Apr 2016 10:34:08 -0700 Subject: [PATCH] PHOENIX-2758 Ordered GROUP BY not occurring with leading PK equality expression --- .../apache/phoenix/end2end/GroupByCaseIT.java | 46 ++++ .../phoenix/compile/GroupByCompiler.java | 253 ++++++++++-------- .../apache/phoenix/compile/QueryCompiler.java | 5 +- .../phoenix/compile/QueryCompilerTest.java | 35 ++- 4 files changed, 224 insertions(+), 115 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java index 172f9f7c0ae..6285b8ee4d9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByCaseIT.java @@ -33,6 +33,7 @@ import java.util.Properties; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; import org.junit.Test; @@ -261,4 +262,49 @@ public void testGroupByArray() throws Exception { assertFalse(rs.next()); conn.close(); } + + @Test + public void testGroupByOrderPreserving() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("CREATE TABLE T (ORGANIZATION_ID char(15) not null, \n" + + "JOURNEY_ID char(15) not null, \n" + + "DATASOURCE SMALLINT not null, \n" + + "MATCH_STATUS TINYINT not null, \n" + + "EXTERNAL_DATASOURCE_KEY varchar(30), \n" + + "ENTITY_ID char(15) not null, \n" + + "CONSTRAINT PK PRIMARY KEY (\n" + + " ORGANIZATION_ID, \n" + + " JOURNEY_ID, \n" + + " DATASOURCE, \n" + + " MATCH_STATUS,\n" + + " EXTERNAL_DATASOURCE_KEY,\n" + + " ENTITY_ID))"); + conn.createStatement().execute("UPSERT INTO T VALUES('000001111122222', '333334444455555', 0, 0, 'abc', '666667777788888')"); + conn.createStatement().execute("UPSERT INTO T VALUES('000001111122222', '333334444455555', 0, 0, 'abcd', '666667777788889')"); + conn.createStatement().execute("UPSERT INTO T VALUES('000001111122222', '333334444455555', 0, 0, 'abc', '666667777788899')"); + conn.commit(); + String query = + "SELECT COUNT(1), EXTERNAL_DATASOURCE_KEY As DUP_COUNT\n" + + " FROM T \n" + + " WHERE JOURNEY_ID='333334444455555' AND \n" + + " DATASOURCE=0 AND MATCH_STATUS <= 1 and \n" + + " ORGANIZATION_ID='000001111122222' \n" + + " GROUP BY MATCH_STATUS, EXTERNAL_DATASOURCE_KEY \n" + + " HAVING COUNT(1) > 1"; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals(2,rs.getInt(1)); + assertEquals("abc", rs.getString(2)); + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals( + "CLIENT PARALLEL 1-WAY RANGE SCAN OVER T ['000001111122222','333334444455555',0,*] - ['000001111122222','333334444455555',0,1]\n" + + " SERVER FILTER BY FIRST KEY ONLY\n" + + " SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [MATCH_STATUS, EXTERNAL_DATASOURCE_KEY]\n" + + "CLIENT MERGE SORT\n" + + "CLIENT FILTER BY COUNT(1) > 1",QueryUtil.getExplainPlan(rs)); + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java index 85478bf52fb..cabe7772647 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java @@ -57,14 +57,33 @@ public class GroupByCompiler { public static class GroupBy { private final List expressions; private final List keyExpressions; - private final String scanAttribName; - public static final GroupByCompiler.GroupBy EMPTY_GROUP_BY = new GroupBy(new GroupByBuilder()); + private final boolean isOrderPreserving; + public static final GroupByCompiler.GroupBy EMPTY_GROUP_BY = new GroupBy(new GroupByBuilder()) { + @Override + public void explain(List planSteps, Integer limit) { + } + @Override + public String getScanAttribName() { + return null; + } + }; + public static final GroupByCompiler.GroupBy UNGROUPED_GROUP_BY = new GroupBy(new GroupByBuilder().setIsOrderPreserving(true)) { + @Override + public void explain(List planSteps, Integer limit) { + planSteps.add(" SERVER AGGREGATE INTO SINGLE ROW"); + } + @Override + public String getScanAttribName() { + return BaseScannerRegionObserver.UNGROUPED_AGG; + } + }; private GroupBy(GroupByBuilder builder) { this.expressions = ImmutableList.copyOf(builder.expressions); - this.keyExpressions = ImmutableList.copyOf(builder.keyExpressions); - this.scanAttribName = builder.scanAttribName; - assert(expressions.size() == keyExpressions.size()); + this.keyExpressions = builder.expressions == builder.keyExpressions ? + this.expressions : builder.keyExpressions == null ? null : + ImmutableList.copyOf(builder.keyExpressions); + this.isOrderPreserving = builder.isOrderPreserving; } public List getExpressions() { @@ -76,129 +95,47 @@ public List getKeyExpressions() { } public String getScanAttribName() { - return scanAttribName; + return isOrderPreserving ? + BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS : + BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS; } public boolean isEmpty() { return expressions.isEmpty(); } - public static class GroupByBuilder { - private String scanAttribName; - private List expressions = Collections.emptyList(); - private List keyExpressions = Collections.emptyList(); - - public GroupByBuilder() { - } - - public GroupByBuilder setScanAttribName(String scanAttribName) { - this.scanAttribName = scanAttribName; - return this; - } - - public GroupByBuilder setExpressions(List expressions) { - this.expressions = expressions; - return this; - } - - public GroupByBuilder setKeyExpressions(List keyExpressions) { - this.keyExpressions = keyExpressions; - return this; - } - - public GroupBy build() { - return new GroupBy(this); - } - } - public boolean isOrderPreserving() { - return !BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS.equals(scanAttribName); + return isOrderPreserving; } - public void explain(List planSteps, Integer limit) { - if (scanAttribName != null) { - if (BaseScannerRegionObserver.UNGROUPED_AGG.equals(scanAttribName)) { - planSteps.add(" SERVER AGGREGATE INTO SINGLE ROW"); - } else if (BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS.equals(scanAttribName)) { - planSteps.add(" SERVER AGGREGATE INTO DISTINCT ROWS BY " + getExpressions() + (limit == null ? "" : " LIMIT " + limit + " GROUP" + (limit.intValue() == 1 ? "" : "S"))); - } else { - planSteps.add(" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY " + getExpressions() + (limit == null ? "" : " LIMIT " + limit + " GROUP" + (limit.intValue() == 1 ? "" : "S"))); + public GroupBy compile(StatementContext context, TupleProjector tupleProjector) throws SQLException { + boolean isOrderPreserving = this.isOrderPreserving; + if (isOrderPreserving) { + OrderPreservingTracker tracker = new OrderPreservingTracker(context, GroupBy.EMPTY_GROUP_BY, Ordering.UNORDERED, expressions.size(), tupleProjector); + for (int i = 0; i < expressions.size(); i++) { + Expression expression = expressions.get(i); + tracker.track(expression); } + + // This is true if the GROUP BY is composed of only PK columns. We further check here that + // there are no "gaps" in the PK columns positions used (i.e. we start with the first PK + // column and use each subsequent one in PK order). + isOrderPreserving = tracker.isOrderPreserving(); } - } - } - - /** - * Get list of columns in the GROUP BY clause. - * @param context query context kept between compilation of different query clauses - * @param statement SQL statement being compiled - * @return the {@link GroupBy} instance encapsulating the group by clause - * @throws ColumnNotFoundException if column name could not be resolved - * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables - */ - public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector, boolean isInRowKeyOrder) throws SQLException { - List groupByNodes = statement.getGroupBy(); - /** - * Distinct can use an aggregate plan if there's no group by. - * Otherwise, we need to insert a step after the Merge that dedups. - * Order by only allowed on columns in the select distinct - */ - if (groupByNodes.isEmpty()) { - if (statement.isAggregate()) { - return new GroupBy.GroupByBuilder().setScanAttribName(BaseScannerRegionObserver.UNGROUPED_AGG).build(); - } - if (!statement.isDistinct()) { - return GroupBy.EMPTY_GROUP_BY; + if (isOrderPreserving) { + return this; } - groupByNodes = Lists.newArrayListWithExpectedSize(statement.getSelect().size()); - for (AliasedNode aliasedNode : statement.getSelect()) { - groupByNodes.add(aliasedNode.getNode()); - } - } - - // Accumulate expressions in GROUP BY - ExpressionCompiler compiler = - new ExpressionCompiler(context, GroupBy.EMPTY_GROUP_BY); - List> groupBys = Lists.newArrayListWithExpectedSize(groupByNodes.size()); - OrderPreservingTracker tracker = new OrderPreservingTracker(context, GroupBy.EMPTY_GROUP_BY, Ordering.UNORDERED, groupByNodes.size(), tupleProjector); - for (int i = 0; i < groupByNodes.size(); i++) { - ParseNode node = groupByNodes.get(i); - Expression expression = node.accept(compiler); - if (!expression.isStateless()) { - if (compiler.isAggregate()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_IN_GROUP_BY) - .setMessage(expression.toString()).build().buildException(); - } - tracker.track(expression); + List expressions = Lists.newArrayListWithExpectedSize(this.expressions.size()); + List keyExpressions = expressions; + List> groupBys = Lists.newArrayListWithExpectedSize(this.expressions.size()); + for (int i = 0; i < this.expressions.size(); i++) { + Expression expression = this.expressions.get(i); groupBys.add(new Pair(i,expression)); } - compiler.reset(); - } - - if (groupBys.isEmpty()) { - return GroupBy.EMPTY_GROUP_BY; - } - - boolean isRowKeyOrderedGrouping = isInRowKeyOrder && tracker.isOrderPreserving(); - List expressions = Lists.newArrayListWithExpectedSize(groupBys.size()); - List keyExpressions = expressions; - String groupExprAttribName; - // This is true if the GROUP BY is composed of only PK columns. We further check here that - // there are no "gaps" in the PK columns positions used (i.e. we start with the first PK - // column and use each subsequent one in PK order). - if (isRowKeyOrderedGrouping) { - groupExprAttribName = BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS; - for (Pair groupBy : groupBys) { - expressions.add(groupBy.getSecond()); - } - } else { - /* - * Otherwise, our coprocessor needs to collect all distinct groups within a region, sort them, and - * hold on to them until the scan completes. - */ - groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS; /* + * If we're not ordered along the PK axis, our coprocessor needs to collect all distinct groups within + * a region, sort them, and hold on to them until the scan completes. * Put fixed length nullables at the end, so that we can represent null by the absence of the trailing * value in the group by key. If there is more than one, we'll need to convert the ones not at the end * into a Decimal so that we can use an empty byte array as our representation for null (which correctly @@ -276,9 +213,99 @@ public int compare(Pair gb1, Pair gb2) { // than one fixed and nullable types are used in a group by clause keyExpressions.set(i, CoerceExpression.create(expression, keyType)); } + + GroupBy groupBy = new GroupBy.GroupByBuilder().setIsOrderPreserving(isOrderPreserving).setExpressions(expressions).setKeyExpressions(keyExpressions).build(); + return groupBy; + } + + public static class GroupByBuilder { + private boolean isOrderPreserving; + private List expressions = Collections.emptyList(); + private List keyExpressions = Collections.emptyList(); + + public GroupByBuilder() { + } + + public GroupByBuilder setExpressions(List expressions) { + this.expressions = expressions; + return this; + } + + public GroupByBuilder setKeyExpressions(List keyExpressions) { + this.keyExpressions = keyExpressions; + return this; + } + + public GroupByBuilder setIsOrderPreserving(boolean isOrderPreserving) { + this.isOrderPreserving = isOrderPreserving; + return this; + } + + public GroupBy build() { + return new GroupBy(this); + } + } + + public void explain(List planSteps, Integer limit) { + if (isOrderPreserving) { + planSteps.add(" SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY " + getExpressions() + (limit == null ? "" : " LIMIT " + limit + " GROUP" + (limit.intValue() == 1 ? "" : "S"))); + } else { + planSteps.add(" SERVER AGGREGATE INTO DISTINCT ROWS BY " + getExpressions() + (limit == null ? "" : " LIMIT " + limit + " GROUP" + (limit.intValue() == 1 ? "" : "S"))); + } + } + } + + /** + * Get list of columns in the GROUP BY clause. + * @param context query context kept between compilation of different query clauses + * @param statement SQL statement being compiled + * @return the {@link GroupBy} instance encapsulating the group by clause + * @throws ColumnNotFoundException if column name could not be resolved + * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables + */ + public static GroupBy compile(StatementContext context, SelectStatement statement, boolean isOrderPreserving) throws SQLException { + List groupByNodes = statement.getGroupBy(); + /** + * Distinct can use an aggregate plan if there's no group by. + * Otherwise, we need to insert a step after the Merge that dedups. + * Order by only allowed on columns in the select distinct + */ + if (groupByNodes.isEmpty()) { + if (statement.isAggregate()) { + return GroupBy.UNGROUPED_GROUP_BY; + } + if (!statement.isDistinct()) { + return GroupBy.EMPTY_GROUP_BY; + } + + groupByNodes = Lists.newArrayListWithExpectedSize(statement.getSelect().size()); + for (AliasedNode aliasedNode : statement.getSelect()) { + groupByNodes.add(aliasedNode.getNode()); + } } - GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(expressions).setKeyExpressions(keyExpressions).build(); + // Accumulate expressions in GROUP BY + ExpressionCompiler compiler = + new ExpressionCompiler(context, GroupBy.EMPTY_GROUP_BY); + List expressions = Lists.newArrayListWithExpectedSize(groupByNodes.size()); + for (int i = 0; i < groupByNodes.size(); i++) { + ParseNode node = groupByNodes.get(i); + Expression expression = node.accept(compiler); + if (!expression.isStateless()) { + if (compiler.isAggregate()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_IN_GROUP_BY) + .setMessage(expression.toString()).build().buildException(); + } + expressions.add(expression); + } + compiler.reset(); + } + + if (expressions.isEmpty()) { + return GroupBy.EMPTY_GROUP_BY; + } + + GroupBy groupBy = new GroupBy.GroupByBuilder().setIsOrderPreserving(isOrderPreserving).setExpressions(expressions).setKeyExpressions(expressions).build(); return groupBy; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 9e756c8d009..113aa2bcea3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -530,7 +530,7 @@ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectState } Integer limit = LimitCompiler.compile(context, select); - GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector, isInRowKeyOrder); + GroupBy groupBy = GroupByCompiler.compile(context, select, isInRowKeyOrder); // Optimize the HAVING clause by finding any group by expressions that can be moved // to the WHERE clause select = HavingCompiler.rewrite(context, select, groupBy); @@ -542,6 +542,9 @@ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectState } Set subqueries = Sets. newHashSet(); Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries); + // Recompile GROUP BY now that we've figured out our ScanRanges so we know + // definitively whether or not we'll traverse in row key order. + groupBy = groupBy.compile(context, innerPlanTupleProjector); context.setResolver(resolver); // recover resolver RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.emptyList() : targetColumns, where); OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, projector, groupBy == GroupBy.EMPTY_GROUP_BY ? innerPlanTupleProjector : null, isInRowKeyOrder); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index ce38cfddd28..4b756fa5c31 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -1351,7 +1351,7 @@ public void testInvalidNextValueFor() throws Exception { compileQuery(query, binds); fail("Compilation should have failed since this is an invalid usage of NEXT VALUE FOR: " + query); } catch (SQLException e) { - assertEquals(SQLExceptionCode.INVALID_USE_OF_NEXT_VALUE_FOR.getErrorCode(), e.getErrorCode()); + assertEquals(query, SQLExceptionCode.INVALID_USE_OF_NEXT_VALUE_FOR.getErrorCode(), e.getErrorCode()); } } } @@ -1825,6 +1825,39 @@ public void testGroupByOrderPreserving() throws Exception { } } + @Test + public void testGroupByOrderPreserving2() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("CREATE TABLE T (ORGANIZATION_ID char(15) not null, \n" + + "JOURNEY_ID char(15) not null, \n" + + "DATASOURCE SMALLINT not null, \n" + + "MATCH_STATUS TINYINT not null, \n" + + "EXTERNAL_DATASOURCE_KEY varchar(30), \n" + + "ENTITY_ID char(15) not null, \n" + + "CONSTRAINT PK PRIMARY KEY (\n" + + " ORGANIZATION_ID, \n" + + " JOURNEY_ID, \n" + + " DATASOURCE, \n" + + " MATCH_STATUS,\n" + + " EXTERNAL_DATASOURCE_KEY,\n" + + " ENTITY_ID))"); + String[] queries = { + "SELECT COUNT(1) As DUP_COUNT\n" + + " FROM T \n" + + " WHERE JOURNEY_ID='07ixx000000004J' AND \n" + + " DATASOURCE=0 AND MATCH_STATUS <= 1 and \n" + + " ORGANIZATION_ID='07ixx000000004J' \n" + + " GROUP BY MATCH_STATUS, EXTERNAL_DATASOURCE_KEY \n" + + " HAVING COUNT(1) > 1", + }; + String query; + for (int i = 0; i < queries.length; i++) { + query = queries[i]; + QueryPlan plan = conn.createStatement().unwrap(PhoenixStatement.class).compileQuery(query); + assertTrue("Expected group by to be order preserving: " + query, plan.getGroupBy().isOrderPreserving()); + } + } + @Test public void testNotGroupByOrderPreserving() throws Exception { Connection conn = DriverManager.getConnection(getUrl());