From c89d815f64bba2cd85a2b03ed200338f9a47f63e Mon Sep 17 00:00:00 2001 From: maryannxue Date: Mon, 18 Aug 2014 23:04:00 -0400 Subject: [PATCH] PHOENIX-852 Optimize child/parent foreign key joins --- .../apache/phoenix/end2end/HashJoinIT.java | 224 +++++++++++++++++- .../apache/phoenix/compile/JoinCompiler.java | 20 +- .../apache/phoenix/compile/QueryCompiler.java | 65 ++++- .../apache/phoenix/compile/WhereCompiler.java | 37 +++ .../apache/phoenix/execute/HashJoinPlan.java | 65 ++++- .../apache/phoenix/join/HashCacheClient.java | 13 +- .../org/apache/phoenix/parse/HintNode.java | 8 + 7 files changed, 420 insertions(+), 12 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java index e8802539a93..1f045bdaff0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java @@ -229,7 +229,8 @@ public static Collection data() { " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + " SERVER FILTER BY QUANTITY < 5000\n" + " BUILD HASH TABLE 1\n" + - " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME, + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " DYNAMIC SERVER FILTER BY item_id IN (O.item_id)", /* * testSelfJoin() * SELECT i2.item_id, i1.name FROM joinItemTable i1 @@ -310,7 +311,8 @@ public static Collection data() { " SERVER FILTER BY NAME != 'T3'\n" + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + " BUILD HASH TABLE 0\n" + - " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME, + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " DYNAMIC SERVER FILTER BY customer_id IN (O.customer_id)", /* * testJoinWithSubqueryAndAggregation() * SELECT i.name, sum(quantity) FROM joinOrderTable o @@ -440,6 +442,44 @@ public static Collection data() { " BUILD HASH TABLE 1(DELAYED EVALUATION)\n" + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + " JOIN-SCANNER 4 ROW LIMIT", + /* + * testJoinWithScanRangeOptimization() + * SELECT (*JOIN_SCAN_RANGE_OPTIMIZATION*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 + * FROM TEMP_TABLE_COMPOSITE_PK lhs + * JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col1 = rhs.col2 + */ + "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + + " BUILD HASH TABLE 0\n" + + " CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + " CLIENT MERGE SORT", + /* + * testJoinWithScanRangeOptimization() + * SELECT (*JOIN_SCAN_RANGE_OPTIMIZATION*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 + * FROM TEMP_TABLE_COMPOSITE_PK lhs + * JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col2 + */ + "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + + " BUILD HASH TABLE 0\n" + + " CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + " CLIENT MERGE SORT\n" + + " DYNAMIC SERVER FILTER BY COL0 IN (RHS.COL2)", + /* + * testJoinWithScanRangeOptimization() + * SELECT (*JOIN_SCAN_RANGE_OPTIMIZATION*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 + * FROM TEMP_TABLE_COMPOSITE_PK lhs + * JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col1 AND lhs.col1 = rhs.col2 + */ + "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + + " BUILD HASH TABLE 0\n" + + " CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + " CLIENT MERGE SORT\n" + + " DYNAMIC SERVER FILTER BY (COL0, COL1) IN ((RHS.COL1, RHS.COL2))", }}); testCases.add(new String[][] { { @@ -647,7 +687,8 @@ public static Collection data() { " SERVER FILTER BY NAME != 'T3'\n" + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + " BUILD HASH TABLE 0\n" + - " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME, + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " DYNAMIC SERVER FILTER BY customer_id IN (O.customer_id)", /* * testJoinWithSubqueryAndAggregation() * SELECT i.name, sum(quantity) FROM joinOrderTable o @@ -778,6 +819,44 @@ public static Collection data() { " BUILD HASH TABLE 1(DELAYED EVALUATION)\n" + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + " JOIN-SCANNER 4 ROW LIMIT", + /* + * testJoinWithScanRangeOptimization() + * SELECT (*JOIN_SCAN_RANGE_OPTIMIZATION*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 + * FROM TEMP_TABLE_COMPOSITE_PK lhs + * JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col1 = rhs.col2 + */ + "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + + " BUILD HASH TABLE 0\n" + + " CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + " CLIENT MERGE SORT", + /* + * testJoinWithScanRangeOptimization() + * SELECT (*JOIN_SCAN_RANGE_OPTIMIZATION*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 + * FROM TEMP_TABLE_COMPOSITE_PK lhs + * JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col2 + */ + "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + + " BUILD HASH TABLE 0\n" + + " CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + " CLIENT MERGE SORT\n" + + " DYNAMIC SERVER FILTER BY COL0 IN (RHS.COL2)", + /* + * testJoinWithScanRangeOptimization() + * SELECT (*JOIN_SCAN_RANGE_OPTIMIZATION*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 + * FROM TEMP_TABLE_COMPOSITE_PK lhs + * JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col1 AND lhs.col1 = rhs.col2 + */ + "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + + " BUILD HASH TABLE 0\n" + + " CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + " CLIENT MERGE SORT\n" + + " DYNAMIC SERVER FILTER BY (COL0, COL1) IN ((RHS.COL1, RHS.COL2))", }}); testCases.add(new String[][] { { @@ -1000,7 +1079,8 @@ public static Collection data() { " CLIENT MERGE SORT\n" + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + " BUILD HASH TABLE 0\n" + - " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME, + " CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" + + " DYNAMIC SERVER FILTER BY customer_id IN (O.customer_id)", /* * testJoinWithSubqueryAndAggregation() * SELECT i.name, sum(quantity) FROM joinOrderTable o @@ -1136,6 +1216,44 @@ public static Collection data() { " BUILD HASH TABLE 1(DELAYED EVALUATION)\n" + " CLIENT PARALLEL 1-WAY FULL SCAN OVER "+ JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" + " JOIN-SCANNER 4 ROW LIMIT", + /* + * testJoinWithScanRangeOptimization() + * SELECT (*JOIN_SCAN_RANGE_OPTIMIZATION*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 + * FROM TEMP_TABLE_COMPOSITE_PK lhs + * JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col1 = rhs.col2 + */ + "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + + " BUILD HASH TABLE 0\n" + + " CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + " CLIENT MERGE SORT", + /* + * testJoinWithScanRangeOptimization() + * SELECT (*JOIN_SCAN_RANGE_OPTIMIZATION*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 + * FROM TEMP_TABLE_COMPOSITE_PK lhs + * JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col2 + */ + "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + + " BUILD HASH TABLE 0\n" + + " CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + " CLIENT MERGE SORT\n" + + " DYNAMIC SERVER FILTER BY COL0 IN (RHS.COL2)", + /* + * testJoinWithScanRangeOptimization() + * SELECT (*JOIN_SCAN_RANGE_OPTIMIZATION*) lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 + * FROM TEMP_TABLE_COMPOSITE_PK lhs + * JOIN TEMP_TABLE_COMPOSITE_PK rhs ON lhs.col0 = rhs.col1 AND lhs.col1 = rhs.col2 + */ + "CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + "CLIENT MERGE SORT\n" + + " PARALLEL EQUI-JOIN 1 HASH TABLES:\n" + + " BUILD HASH TABLE 0\n" + + " CLIENT PARALLEL 4-WAY FULL SCAN OVER TEMP_TABLE_COMPOSITE_PK\n" + + " CLIENT MERGE SORT\n" + + " DYNAMIC SERVER FILTER BY (COL0, COL1) IN ((RHS.COL1, RHS.COL2))", }}); return testCases; } @@ -3596,6 +3714,104 @@ public void testJoinWithLimit() throws Exception { conn.close(); } } + + @Test + public void testJoinWithScanRangeOptimization() throws Exception { + String tempTableWithCompositePK = "TEMP_TABLE_COMPOSITE_PK"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.createStatement().execute("CREATE TABLE " + tempTableWithCompositePK + + " (col0 INTEGER NOT NULL, " + + " col1 INTEGER NOT NULL, " + + " col2 INTEGER " + + " CONSTRAINT pk PRIMARY KEY (col0, col1)) " + + " SALT_BUCKETS=4"); + + PreparedStatement upsertStmt = conn.prepareStatement( + "upsert into " + tempTableWithCompositePK + "(col0, col1, col2) " + "values (?, ?, ?)"); + for (int i = 0; i < 3; i++) { + upsertStmt.setInt(1, i + 1); + upsertStmt.setInt(2, i + 2); + upsertStmt.setInt(3, i + 3); + upsertStmt.execute(); + } + conn.commit(); + + // No leading part of PK + String query = "SELECT /*+ JOIN_SCAN_RANGE_OPTIMIZATION*/ lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 FROM " + + tempTableWithCompositePK + " lhs JOIN " + + tempTableWithCompositePK + " rhs ON lhs.col1 = rhs.col2"; + PreparedStatement statement = conn.prepareStatement(query); + ResultSet rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 2); + assertEquals(rs.getInt(2), 3); + assertEquals(rs.getInt(3), 4); + assertEquals(rs.getInt(4), 1); + assertEquals(rs.getInt(5), 2); + assertEquals(rs.getInt(6), 3); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 3); + assertEquals(rs.getInt(2), 4); + assertEquals(rs.getInt(3), 5); + assertEquals(rs.getInt(4), 2); + assertEquals(rs.getInt(5), 3); + assertEquals(rs.getInt(6), 4); + + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals(plans[21], QueryUtil.getExplainPlan(rs)); + + // Leading park of PK + query = "SELECT /*+ JOIN_SCAN_RANGE_OPTIMIZATION*/ lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 FROM " + + tempTableWithCompositePK + " lhs JOIN " + + tempTableWithCompositePK + " rhs ON lhs.col0 = rhs.col2"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 3); + assertEquals(rs.getInt(2), 4); + assertEquals(rs.getInt(3), 5); + assertEquals(rs.getInt(4), 1); + assertEquals(rs.getInt(5), 2); + assertEquals(rs.getInt(6), 3); + + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals(plans[22], QueryUtil.getExplainPlan(rs)); + + // All parts of PK + query = "SELECT /*+ JOIN_SCAN_RANGE_OPTIMIZATION*/ lhs.col0, lhs.col1, lhs.col2, rhs.col0, rhs.col1, rhs.col2 FROM " + + tempTableWithCompositePK + " lhs JOIN " + + tempTableWithCompositePK + " rhs ON lhs.col0 = rhs.col1 AND lhs.col1 = rhs.col2"; + statement = conn.prepareStatement(query); + rs = statement.executeQuery(); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 2); + assertEquals(rs.getInt(2), 3); + assertEquals(rs.getInt(3), 4); + assertEquals(rs.getInt(4), 1); + assertEquals(rs.getInt(5), 2); + assertEquals(rs.getInt(6), 3); + assertTrue(rs.next()); + assertEquals(rs.getInt(1), 3); + assertEquals(rs.getInt(2), 4); + assertEquals(rs.getInt(3), 5); + assertEquals(rs.getInt(4), 2); + assertEquals(rs.getInt(5), 3); + assertEquals(rs.getInt(6), 4); + + assertFalse(rs.next()); + + rs = conn.createStatement().executeQuery("EXPLAIN " + query); + assertEquals(plans[23], QueryUtil.getExplainPlan(rs)); + } finally { + conn.close(); + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index 810e1cda047..81d169c0ac1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -83,7 +83,6 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; @@ -421,6 +420,21 @@ public boolean hasPostReference() { return false; } + + public boolean hasFilters() { + if (!postFilters.isEmpty()) + return true; + + if (!hasRightJoin && table.hasFilters()) + return true; + + for (JoinTable joinTable : prefilterAcceptedTables) { + if (joinTable.hasFilters()) + return true; + } + + return false; + } } public static class JoinSpec { @@ -664,6 +678,10 @@ public SelectStatement getAsSubquery() throws SQLException { return NODE_FACTORY.select(from, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, null, null, 0, false, select.hasSequence()); } + public boolean hasFilters() { + return isSubselect() ? (!postFilters.isEmpty() || subselect.getWhere() != null || subselect.getHaving() != null) : !preFilters.isEmpty(); + } + public boolean isFlat() { return subselect == null || JoinCompiler.isFlat(subselect); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 20c0acd9f25..b532e288901 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -20,6 +20,7 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.Collections; +import java.util.Comparator; import java.util.List; import org.apache.hadoop.hbase.client.Scan; @@ -37,6 +38,8 @@ import org.apache.phoenix.execute.HashJoinPlan; import org.apache.phoenix.execute.ScanPlan; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.expression.RowValueConstructorExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -58,6 +61,8 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.ScanUtil; +import com.google.common.collect.Lists; + /** @@ -83,6 +88,8 @@ public class QueryCompiler { private final List targetColumns; private final ParallelIteratorFactory parallelIteratorFactory; private final SequenceManager sequenceManager; + private final boolean forceJoinScanRangeOptimization; + private final boolean disableJoinScanRangeOptimization; public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException { this(statement, select, resolver, Collections.emptyList(), null, new SequenceManager(statement)); @@ -104,6 +111,8 @@ public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnR } this.originalScan = ScanUtil.newScan(scan); + this.forceJoinScanRangeOptimization = select.getHint().hasHint(Hint.JOIN_SCAN_RANGE_OPTIMIZATION); + this.disableJoinScanRangeOptimization = select.getHint().hasHint(Hint.NO_JOIN_SCAN_RANGE_OPTIMIZATION); } /** @@ -179,6 +188,8 @@ protected QueryPlan compileJoinQuery(StatementContext context, List bind ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count]; List[] joinExpressions = new List[count]; List[] hashExpressions = new List[count]; + Expression[] scanRangeOptimizationLhsExpressions = new Expression[count]; + Expression[] scanRangeOptimizationRhsExpressions = new Expression[count]; JoinType[] joinTypes = new JoinType[count]; PTable[] tables = new PTable[count]; int[] fieldPositions = new int[count]; @@ -211,6 +222,9 @@ protected QueryPlan compileJoinQuery(StatementContext context, List bind Pair, List> joinConditions = joinSpec.compileJoinConditions(context, leftResolver, resolver); joinExpressions[i] = joinConditions.getFirst(); hashExpressions[i] = joinConditions.getSecond(); + Pair scanRangeOptimizationExpressions = extractScanRangeOptimizationExpressions(context, tableRef.getTable(), joinSpec.getType(), joinSpec.getJoinTable(), joinExpressions[i], hashExpressions[i]); + scanRangeOptimizationLhsExpressions[i] = scanRangeOptimizationExpressions.getFirst(); + scanRangeOptimizationRhsExpressions[i] = scanRangeOptimizationExpressions.getSecond(); joinTypes[i] = joinSpec.getType(); if (i < count - 1) { fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size())); @@ -228,7 +242,7 @@ protected QueryPlan compileJoinQuery(StatementContext context, List bind limit = LimitCompiler.compile(context, query); } HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit, forceProjection); - return new HashJoinPlan(joinTable.getStatement(), plan, joinInfo, hashExpressions, joinPlans, clientProjectors); + return new HashJoinPlan(joinTable.getStatement(), plan, joinInfo, hashExpressions, scanRangeOptimizationLhsExpressions, scanRangeOptimizationRhsExpressions, joinPlans, clientProjectors); } JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1); @@ -283,13 +297,60 @@ protected QueryPlan compileJoinQuery(StatementContext context, List bind limit = LimitCompiler.compile(context, rhs); } HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection); - return new HashJoinPlan(joinTable.getStatement(), rhsPlan, joinInfo, new List[] {hashExpressions}, new QueryPlan[] {lhsPlan}, new TupleProjector[] {clientProjector}); + Pair scanRangeOptimizationExpressions = extractScanRangeOptimizationExpressions(context, rhsTableRef.getTable(), type, lhsJoin, joinExpressions, hashExpressions); + return new HashJoinPlan(joinTable.getStatement(), rhsPlan, joinInfo, new List[] {hashExpressions}, new Expression[] {scanRangeOptimizationExpressions.getFirst()}, new Expression[] {scanRangeOptimizationExpressions.getSecond()}, new QueryPlan[] {lhsPlan}, new TupleProjector[] {clientProjector}); } // Do not support queries like "A right join B left join C" with hash-joins. throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported."); } + private Pair extractScanRangeOptimizationExpressions(StatementContext context, PTable table, JoinType type, JoinTable joinTable, final List joinExpressions, final List hashExpressions) { + if (type != JoinType.Inner || + (!forceJoinScanRangeOptimization + && (disableJoinScanRangeOptimization || !joinTable.hasFilters()))) + return new Pair(null, null); + + List rowkeyColumnIndexes = Lists.newArrayList(); + for (int i = 0; i < joinExpressions.size(); i++) { + Expression joinExpression = joinExpressions.get(i); + if (joinExpression instanceof RowKeyColumnExpression) { + rowkeyColumnIndexes.add(i); + } + } + Collections.sort(rowkeyColumnIndexes, new Comparator() { + @Override + public int compare(Integer l, Integer r) { + return ((RowKeyColumnExpression) joinExpressions.get(l)).getPosition() - ((RowKeyColumnExpression) joinExpressions.get(r)).getPosition(); + } + }); + int positionOffset = (table.getBucketNum() ==null ? 0 : 1) + (context.getConnection().getTenantId() != null && table.isMultiTenant() ? 1 : 0) + (table.getViewIndexId() == null ? 0 : 1); + int position = 0; + for (Integer index : rowkeyColumnIndexes) { + RowKeyColumnExpression exp = (RowKeyColumnExpression) joinExpressions.get(index); + if (exp.getPosition() != position + positionOffset) { + break; + } + position++; + } + + if (position == 0) + return new Pair(null, null); + + if (position == 1) + return new Pair(joinExpressions.get(rowkeyColumnIndexes.get(0)), hashExpressions.get(rowkeyColumnIndexes.get(0))); + + List lChildren = Lists.newArrayList(); + List rChildren = Lists.newArrayList(); + for (int i = 0; i < position; i++) { + Integer index = rowkeyColumnIndexes.get(i); + lChildren.add(joinExpressions.get(index)); + rChildren.add(hashExpressions.get(index)); + } + + return new Pair(new RowValueConstructorExpression(lChildren, false), new RowValueConstructorExpression(rChildren, false)); + } + protected QueryPlan compileSubquery(SelectStatement subquery) throws SQLException { ColumnResolver resolver = FromCompiler.getResolverForQuery(subquery, this.statement.getConnection()); subquery = StatementNormalizer.normalize(subquery, resolver); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index b9a53f8da5d..8ed49c41fd1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -20,6 +20,7 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.Iterator; +import java.util.List; import java.util.Set; import org.apache.hadoop.hbase.client.Scan; @@ -110,6 +111,42 @@ public static Expression compile(StatementContext context, FilterableStatement s return expression; } + /** + * Optimize scan ranges by applying dynamically generated filter expressions. + * @param context the shared context during query compilation + * @param statement TODO + * @throws SQLException if mismatched types are found, bind value do not match binds, + * or invalid function arguments are encountered. + * @throws SQLFeatureNotSupportedException if an unsupported expression is encountered. + * @throws ColumnNotFoundException if column name could not be resolved + * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables + */ + public static Expression optimize(StatementContext context, FilterableStatement statement, ParseNode viewWhere, List dynamicFilters) throws SQLException { + List filters = Lists.newArrayList(dynamicFilters); + Set extractedNodes = Sets.newHashSet(); + WhereExpressionCompiler whereCompiler = new WhereExpressionCompiler(context); + ParseNode where = statement.getWhere(); + if (where != null) { + filters.add(where.accept(whereCompiler)); + } + Expression expression = filters.size() == 1 ? filters.get(0) : AndExpression.create(filters); + if (whereCompiler.isAggregate()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_IN_WHERE).build().buildException(); + } + if (expression.getDataType() != PDataType.BOOLEAN) { + throw TypeMismatchException.newException(PDataType.BOOLEAN, expression.getDataType(), expression.toString()); + } + if (viewWhere != null) { + WhereExpressionCompiler viewWhereCompiler = new WhereExpressionCompiler(context, true); + Expression viewExpression = viewWhere.accept(viewWhereCompiler); + expression = AndExpression.create(Lists.newArrayList(expression, viewExpression)); + } + + expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes); + + return expression; + } + private static class WhereExpressionCompiler extends ExpressionCompiler { private boolean disambiguateWithFamily; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 401c15b1368..b61984fbe93 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -29,15 +29,20 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.InListExpression; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.job.JobManager.JobCallable; @@ -45,10 +50,15 @@ import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.join.TupleProjector; import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.ParseNode; +import org.apache.phoenix.parse.SQLParser; +import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; @@ -62,17 +72,22 @@ public class HashJoinPlan implements QueryPlan { private final BasicQueryPlan plan; private final HashJoinInfo joinInfo; private final List[] hashExpressions; + private final Expression[] inListLhsExpressions; + private final Expression[] inListRhsExpressions; private final QueryPlan[] hashPlans; private final TupleProjector[] clientProjectors; public HashJoinPlan(FilterableStatement statement, BasicQueryPlan plan, HashJoinInfo joinInfo, - List[] hashExpressions, QueryPlan[] hashPlans, + List[] hashExpressions, Expression[] inListLhsExpressions, + Expression[] inListRhsExpressions, QueryPlan[] hashPlans, TupleProjector[] clientProjectors) { this.statement = statement; this.plan = plan; this.joinInfo = joinInfo; this.hashExpressions = hashExpressions; + this.inListLhsExpressions = inListLhsExpressions; + this.inListRhsExpressions = inListRhsExpressions; this.hashPlans = hashPlans; this.clientProjectors = clientProjectors; } @@ -106,18 +121,24 @@ public ResultIterator iterator() throws SQLException { ExecutorService executor = services.getExecutor(); List> futures = new ArrayList>(count); List dependencies = new ArrayList(count); + List inListExpressions = new ArrayList(); + @SuppressWarnings("unchecked") + final List[] inListRhsValues = new List[count]; final int maxServerCacheTimeToLive = services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); final AtomicLong firstJobEndTime = new AtomicLong(0); SQLException firstException = null; for (int i = 0; i < count; i++) { final int index = i; + if (inListRhsExpressions[index] != null) { + inListRhsValues[index] = new ArrayList(); + } futures.add(executor.submit(new JobCallable() { @Override public ServerCache call() throws Exception { QueryPlan hashPlan = hashPlans[index]; ServerCache cache = hashClient.addHashCache(ranges, hashPlan.iterator(), - clientProjectors[index], hashPlan.getEstimatedSize(), hashExpressions[index], plan.getTableRef()); + clientProjectors[index], hashPlan.getEstimatedSize(), hashExpressions[index], plan.getTableRef(), inListRhsExpressions[index], inListRhsValues[index]); long endTime = System.currentTimeMillis(); boolean isSet = firstJobEndTime.compareAndSet(0, endTime); if (!isSet && (endTime - firstJobEndTime.get()) > maxServerCacheTimeToLive) { @@ -137,6 +158,9 @@ public Object getJobId() { ServerCache cache = futures.get(i).get(); joinIds[i].set(cache.getId()); dependencies.add(cache); + if (inListRhsExpressions[i] != null) { + inListExpressions.add(createInListExpression(inListLhsExpressions[i], inListRhsExpressions[i], inListRhsValues[i], plan.getContext().getTempPtr())); + } } catch (InterruptedException e) { if (firstException == null) { firstException = new SQLException("Hash plan [" + i + "] execution interrupted.", e); @@ -154,10 +178,32 @@ public Object getJobId() { } HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo); + if (!inListExpressions.isEmpty()) { + StatementContext context = plan.getContext(); + PTable table = context.getCurrentTable().getTable(); + ParseNode viewWhere = table.getViewStatement() == null ? null : new SQLParser(table.getViewStatement()).parseQuery().getWhere(); + context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (plan.getStatement()), plan.getContext().getConnection())); + WhereCompiler.optimize(plan.getContext(), plan.getStatement(), viewWhere, inListExpressions); + if (LOG.isDebugEnabled()) { + LOG.debug("DYNAMIC SCAN RANGE: " + plan.getExplainPlan().getPlanSteps().get(0)); + } + } return plan.iterator(dependencies); } + private Expression createInListExpression(Expression lhsExpression, + Expression rhsExpression, List rhsValues, + ImmutableBytesWritable ptr) throws SQLException { + List children = Lists.newArrayList(lhsExpression); + PDataType type = rhsExpression.getDataType(); + for (ImmutableBytesWritable value : rhsValues) { + children.add(LiteralExpression.newConstant(type.toObject(value), type)); + } + + return InListExpression.create(children, false, ptr); + } + @Override public long getEstimatedSize() { return plan.getEstimatedSize(); @@ -183,6 +229,21 @@ public ExplainPlan getExplainPlan() throws SQLException { planSteps.add(" " + step); } } + String dynamicFilters = null; + int filterCount = 0; + for (int i = 0; i < count; i++) { + if (inListLhsExpressions[i] != null) { + if (filterCount == 1) { + dynamicFilters = "(" + dynamicFilters + ")"; + } + String filter = inListLhsExpressions[i].toString() + " IN (" + inListRhsExpressions[i].toString() + ")"; + dynamicFilters = dynamicFilters == null ? filter : (dynamicFilters + " AND (" + filter + ")"); + filterCount++; + } + } + if (dynamicFilters != null) { + planSteps.add(" DYNAMIC SERVER FILTER BY " + dynamicFilters); + } if (joinInfo.getPostJoinFilterExpression() != null) { planSteps.add(" AFTER-JOIN SERVER FILTER BY " + joinInfo.getPostJoinFilterExpression().toString()); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java index 909e7721ee2..253ea7f5154 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java @@ -69,16 +69,16 @@ public HashCacheClient(PhoenixConnection connection) { * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed * size */ - public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List onExpressions, TableRef cacheUsingTableRef) throws SQLException { + public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List onExpressions, TableRef cacheUsingTableRef, Expression inListRhsExpression, List inListRhsValues) throws SQLException { /** * Serialize and compress hashCacheTable */ ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - serialize(ptr, iterator, projector, estimatedSize, onExpressions); + serialize(ptr, iterator, projector, estimatedSize, onExpressions, inListRhsExpression, inListRhsValues); return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef); } - private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List onExpressions) throws SQLException { + private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List onExpressions, Expression inListRhsExpression, List inListRhsValues) throws SQLException { long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE); estimatedSize = Math.min(estimatedSize, maxSize); if (estimatedSize > Integer.MAX_VALUE) { @@ -105,6 +105,13 @@ private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, Tupl if (baOut.size() > maxSize) { throw new MaxServerCacheSizeExceededException("Size of hash cache (" + baOut.size() + " bytes) exceeds the maximum allowed size (" + maxSize + " bytes)"); } + if (inListRhsExpression != null) { + ImmutableBytesWritable value = new ImmutableBytesWritable(); + inListRhsExpression.reset(); + if (inListRhsExpression.evaluate(result, value)) { + inListRhsValues.add(value); + } + } nRows++; } TrustedByteArrayOutputStream sizeOut = new TrustedByteArrayOutputStream(Bytes.SIZEOF_INT); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java index 0d2ede977f6..a6575939e91 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java @@ -92,6 +92,14 @@ public enum Hint { * between 2 selected columns this will be give better performance. */ NO_SEEK_TO_COLUMN, + /** + * Forces dynamic scan range optimization for join queries. + */ + JOIN_SCAN_RANGE_OPTIMIZATION, + /** + * Avoids dynamic scan range optimization for join queries. + */ + NO_JOIN_SCAN_RANGE_OPTIMIZATION, }; private final Map hints;