From 79b7948609d2c530afab48dcb3509638c933b83e Mon Sep 17 00:00:00 2001 From: Gerald Sangudi Date: Sun, 23 Sep 2018 09:01:18 -0700 Subject: [PATCH 1/2] PHOENIX-4917 Fix ClassCastException when projecting array elements in hash join --- .../coprocessor/HashJoinRegionScanner.java | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index f4224cd85a3..3489b7907b4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -50,6 +50,7 @@ import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; +import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TupleUtil; @@ -207,19 +208,19 @@ private void processResults(List result, boolean hasBatchLimit) throws IOE } if (tempTuples[i] == null) { Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ? - lhs : TupleProjector.mergeProjectedValue( - (ProjectedValueTuple) lhs, schema, tempDestBitSet, - null, joinInfo.getSchemas()[i], tempSrcBitSet[i], - joinInfo.getFieldPositions()[i], useNewValueColumnQualifier); + lhs : mergeProjectedValue( + lhs, schema, tempDestBitSet, null, + joinInfo.getSchemas()[i], tempSrcBitSet[i], + joinInfo.getFieldPositions()[i]); offerResult(joined, projected, result); continue; } for (Tuple t : tempTuples[i]) { Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ? - lhs : TupleProjector.mergeProjectedValue( - (ProjectedValueTuple) lhs, schema, tempDestBitSet, - t, joinInfo.getSchemas()[i], tempSrcBitSet[i], - joinInfo.getFieldPositions()[i], useNewValueColumnQualifier); + lhs : mergeProjectedValue( + lhs, schema, tempDestBitSet, t, + joinInfo.getSchemas()[i], tempSrcBitSet[i], + joinInfo.getFieldPositions()[i]); offerResult(joined, projected, result); } } @@ -353,4 +354,37 @@ private void offerResult(Tuple tuple, boolean projected, List result) { MultiKeyValueTuple multi = new MultiKeyValueTuple(cells); resultQueue.offer(multi); } + + // PHOENIX-4917 Merge array element cell through hash join. + // Merge into first cell, then reattach array cell. + private Tuple mergeProjectedValue( + Tuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet, Tuple src, + KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) + throws IOException { + + if (dest instanceof ProjectedValueTuple) { + return TupleProjector.mergeProjectedValue( + (ProjectedValueTuple) dest, destSchema, destBitSet, src, + srcSchema, srcBitSet, offset, useNewValueColumnQualifier); + } + + ProjectedValueTuple first = projector.projectResults( + new SingleKeyValueTuple(dest.getValue(0))); + ProjectedValueTuple merged = TupleProjector.mergeProjectedValue( + first, destSchema, destBitSet, src, srcSchema, + srcBitSet, offset, useNewValueColumnQualifier); + + int size = dest.size(); + if (size == 1) { + return merged; + } + + List cells = new ArrayList(size); + cells.add(merged.getValue(0)); + for (int i = 1; i < size; i++) { + cells.add(dest.getValue(i)); + } + MultiKeyValueTuple multi = new MultiKeyValueTuple(cells); + return multi; + } } From e6456c8a7f24255780ff7e25998367f13dff44e1 Mon Sep 17 00:00:00 2001 From: Gerald Sangudi Date: Mon, 1 Oct 2018 21:17:29 -0700 Subject: [PATCH 2/2] PHOENIX-4917 Test array element projection using a multi-node cluster --- .../ProjectArrayElemHashJoinSplitIT.java | 179 ++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemHashJoinSplitIT.java diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemHashJoinSplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemHashJoinSplitIT.java new file mode 100644 index 00000000000..fad93fe69c9 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemHashJoinSplitIT.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ProjectArrayElemHashJoinSplitIT extends BaseUniqueNamesOwnClusterIT { + + @BeforeClass + public static void doSetup() throws Exception { + NUM_SLAVES_BASE = 3; + Map props = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Test + public void testUnsalted() throws Exception { + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + + try { + String table = createUnsalted(conn); + testTable(conn, table); + } finally { + conn.close(); + } + } + + private void testTable(Connection conn, String table) throws Exception { + + verifyExplain(conn, table, false, false); + verifyExplain(conn, table, false, true); + verifyExplain(conn, table, true, false); + verifyExplain(conn, table, true, true); + + verifyResults(conn, table, false, false); + verifyResults(conn, table, false, true); + verifyResults(conn, table, true, false); + verifyResults(conn, table, true, true); + } + + private String createUnsalted(Connection conn) throws Exception { + + String table = "UNSALTED_" + generateUniqueName(); + String create = "CREATE TABLE " + table + " (" + + " id INTEGER NOT NULL," + + " vals TINYINT[]," + + " CONSTRAINT pk PRIMARY KEY (id)" + + ") SPLIT ON (11, 21)"; + + conn.createStatement().execute(create); + + TableName tableName = TableName.valueOf(null, table); + + // Dummy split points; the table is already split + List splitPoints = new ArrayList(2); + splitPoints.add(new byte[1]); + splitPoints.add(new byte[1]); + + // Call this to move regions to servers + splitTable(tableName, splitPoints); + return table; + } + + private String getQuery(String table, boolean fullArray, boolean hashJoin) { + + String query = "SELECT t.id AS id, t.vals[1] v1, t.vals[2] v2, t.vals[3] v3, t.vals[4] v4" + + (fullArray ? ", t.vals AS vals" : "") + + " FROM " + table + " t " + + (hashJoin ? " JOIN " + table + " t2 ON (t.id = t2.id) " : "") + + " ORDER BY t.id" + ; + + return query; + } + + private void verifyExplain(Connection conn, String table, boolean fullArray, boolean hashJoin) + throws Exception { + + String query = "EXPLAIN " + getQuery(table, fullArray, hashJoin); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(query); + + try { + String plan = QueryUtil.getExplainPlan(rs); + assertTrue(plan != null); + assertTrue(fullArray || plan.contains("SERVER ARRAY ELEMENT PROJECTION")); + assertTrue(hashJoin == plan.contains("JOIN")); + } finally { + rs.close(); + } + } + + private void verifyResults(Connection conn, String table, boolean fullArray, boolean hashJoin) + throws Exception { + + String upsert = "UPSERT INTO " + table + "(id, vals)" + + " VALUES(?, ARRAY[10, 20, 30, 40, 50])"; + PreparedStatement upsertStmt = conn.prepareStatement(upsert); + for (int i = 1; i <= 30; i++) { + upsertStmt.setInt(1, i); + upsertStmt.execute(); + } + conn.commit(); + + String query = getQuery(table, fullArray, hashJoin); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(query); + + try { + for (int i = 1; i <= 30; i++) { + assertTrue(rs.next()); + assertEquals(i, rs.getInt("id")); + assertEquals(10, rs.getInt("v1")); + assertEquals(20, rs.getInt("v2")); + assertEquals(30, rs.getInt("v3")); + assertEquals(40, rs.getInt("v4")); + + if (fullArray) { + java.sql.Array array = rs.getArray("vals"); + assertTrue(array != null); + Object obj = array.getArray(); + assertTrue(obj != null); + assertTrue(obj.getClass().isArray()); + assertEquals(5, java.lang.reflect.Array.getLength(obj)); + } + } + + assertFalse(rs.next()); + } finally { + rs.close(); + } + } + + private void dropTable(Connection conn, String table) throws Exception { + + String drop = "DROP TABLE " + table; + Statement stmt = conn.createStatement(); + stmt.execute(drop); + stmt.close(); + } +}