From 2fb57ee3b40b93936cde93a5b7f68524dead8829 Mon Sep 17 00:00:00 2001 From: Gokcen Iskender Date: Tue, 11 Jun 2019 16:13:25 -0700 Subject: [PATCH] PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactional Tables --- .../end2end/IndexToolForPartialBuildIT.java | 91 ++-- .../end2end/index/ImmutableIndexIT.java | 186 ++++++- .../apache/phoenix/execute/MutationState.java | 490 +++++++++++------- .../hbase/index/IndexRegionObserver.java | 2 +- .../apache/phoenix/index/IndexMaintainer.java | 3 +- .../query/ConnectionQueryServicesImpl.java | 3 +- .../phoenix/query/QueryServicesOptions.java | 1 + .../org/apache/phoenix/util/IndexUtil.java | 63 ++- .../org/apache/phoenix/util/TestUtil.java | 13 +- 9 files changed, 571 insertions(+), 281 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java index 70812c3036a..06649e93e51 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java @@ -17,13 +17,10 @@ */ package org.apache.phoenix.end2end; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DriverManager; @@ -34,21 +31,18 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; -import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; @@ -60,6 +54,7 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; +import org.apache.phoenix.util.TestUtil; import org.junit.BeforeClass; import org.junit.Test; @@ -83,7 +78,6 @@ public IndexToolForPartialBuildIT() { public static Map getServerProperties() { Map serverProps = Maps.newHashMapWithExpectedSize(10); serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); - serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName()); serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0"); serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000"); @@ -105,14 +99,14 @@ public void testSecondaryIndex() throws Exception { String schemaName = generateUniqueName(); String dataTableName = generateUniqueName(); String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName); - final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME); + final String indxTable = String.format("%s_IDX", dataTableName); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString()); props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled)); final Connection conn = DriverManager.getConnection(getUrl(), props); Statement stmt = conn.createStatement(); - try { + try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();){ if (isNamespaceEnabled) { conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName); } @@ -121,25 +115,37 @@ public void testSecondaryIndex() throws Exception { fullTableName, tableDDLOptions)); String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName); PreparedStatement stmt1 = conn.prepareStatement(upsertQuery); - FailingRegionObserver.FAIL_WRITE = false; // insert two rows upsertRow(stmt1, 1000); upsertRow(stmt1, 2000); conn.commit(); stmt.execute(String.format("CREATE INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ", indxTable, fullTableName)); - FailingRegionObserver.FAIL_WRITE = true; upsertRow(stmt1, 3000); upsertRow(stmt1, 4000); upsertRow(stmt1, 5000); - try { - conn.commit(); - fail(); - } catch (SQLException e) {} catch (Exception e) {} + conn.commit(); + + // delete these indexes + PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable)); + Table hTable = admin.getConnection(). + getTable(TableName.valueOf(pindexTable.getPhysicalName().toString())); + + Scan scan = new Scan(); + ResultScanner scanner = hTable.getScanner(scan); + int cnt=0; + for (Result res = scanner.next(); res != null; res = scanner.next()) { + cnt++; + if (cnt > 2) { + hTable.delete(new Delete(res.getRow())); + } + } + hTable.close(); + TestUtil.doMajorCompaction(conn, pindexTable.getPhysicalName().toString()); + conn.createStatement() .execute(String.format("ALTER INDEX %s on %s REBUILD ASYNC", indxTable, fullTableName)); - FailingRegionObserver.FAIL_WRITE = false; ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable, new String[] { PTableType.INDEX.toString() }); assertTrue(rs.next()); @@ -150,21 +156,6 @@ public void testSecondaryIndex() throws Exception { upsertRow(stmt1, 6000); upsertRow(stmt1, 7000); conn.commit(); - - rs = conn.createStatement() - .executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + "," - + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM " - +"\""+ SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " (" - + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where " - + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and " - + PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'")); - rs.next(); - PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable)); - assertEquals(PIndexState.BUILDING, pindexTable.getIndexState()); - assertEquals(rs.getLong(1), pindexTable.getTimeStamp()); - - //assert disabled timestamp - assertEquals(0, rs.getLong(2)); String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName); rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql); @@ -173,7 +164,7 @@ public void testSecondaryIndex() throws Exception { // assert we are pulling from data table. assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, isNamespaceEnabled); - rs = stmt1.executeQuery(selectSql); + rs = stmt1.executeQuery(selectSql); for (int i = 1; i <= 7; i++) { assertTrue(rs.next()); assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1)); @@ -260,24 +251,4 @@ public static void upsertRow(PreparedStatement stmt, int i) throws SQLException stmt.executeUpdate(); } - - public static class FailingRegionObserver extends SimpleRegionObserver { - public static volatile boolean FAIL_WRITE = false; - public static final String INDEX_NAME = "IDX"; - @Override - public void preBatchMutate(ObserverContext c, MiniBatchOperationInProgress miniBatchOp) throws HBaseIOException { - if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) { - throw new DoNotRetryIOException(); - } - Mutation operation = miniBatchOp.getOperation(0); - Set keySet = operation.getFamilyCellMap().keySet(); - for(byte[] family: keySet) { - if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) { - throw new DoNotRetryIOException(); - } - } - } - - } - } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index 8be7b2d0562..2d85dbc4063 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -17,15 +17,22 @@ */ package org.apache.phoenix.end2end.index; +import static org.apache.phoenix.end2end.IndexToolIT.assertExplainPlan; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME; +import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.apache.phoenix.util.TestUtil.getRowCount; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; @@ -40,23 +47,37 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.transaction.PhoenixTransactionProvider; import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; @@ -107,15 +128,16 @@ public static void doSetup() throws Exception { Map clientProps = Maps.newHashMapWithExpectedSize(2); clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true"); clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000"); + clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, "true"); setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } @Parameters(name="ImmutableIndexIT_localIndex={0},transactional={1},transactionProvider={2},columnEncoded={3}") // name is used by failsafe as file name in reports public static Collection data() { return TestUtil.filterTxParamData( - Arrays.asList(new Object[][] { + Arrays.asList(new Object[][] { { false, false, null, false }, { false, false, null, true }, - { false, true, "OMID", false }, + { false, true, "OMID", false }, { false, true, "TEPHRA", false }, { false, true, "TEPHRA", true }, { true, false, null, false }, { true, false, null, true }, { true, true, "TEPHRA", false }, { true, true, "TEPHRA", true }, @@ -259,7 +281,165 @@ private void assertIndexMutations(Connection conn) throws SQLException { (transactionProvider != null && transactionProvider.isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)), iterator.hasNext()); } - + + private void createAndPopulateTableAndIndexForConsistentIndex(Connection conn, String tableName, String indexName, + int numOfRowsToInsert, String storageProps) + throws Exception { + String tableOptions = tableDDLOptions; + if (storageProps != null) { + tableOptions += " ,IMMUTABLE_STORAGE_SCHEME=" + storageProps; + } + String ddl = "CREATE TABLE " + tableName + TestUtil.TEST_TABLE_SCHEMA + tableOptions; + INDEX_DDL = + "CREATE " + " INDEX IF NOT EXISTS " + SchemaUtil.getTableNameFromFullName(indexName) + + " ON " + tableName + " (long_pk, varchar_pk)" + + " INCLUDE (long_col1, long_col2) "; + + conn.createStatement().execute(ddl); + conn.createStatement().execute(INDEX_DDL); + upsertRows(conn, tableName, numOfRowsToInsert); + conn.commit(); + + TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE); + } + + @Test + public void testGlobalImmutableIndexCreate() throws Exception { + if (localIndex || transactionProvider != null) { + return; + } + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + ArrayList immutableStorageProps = new ArrayList(); + immutableStorageProps.add(null); + if (!tableDDLOptions.contains(IMMUTABLE_STORAGE_SCHEME)) { + immutableStorageProps.add(SINGLE_CELL_ARRAY_WITH_OFFSETS.toString()); + } + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(true); + for (String storageProp : immutableStorageProps) { + String tableName = "TBL_" + generateUniqueName(); + String indexName = "IND_" + generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + TABLE_NAME = fullTableName; + int numRows = 1; + createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName, + numRows, storageProp); + + ResultSet rs; + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME); + assertTrue(rs.next()); + assertEquals(numRows, rs.getInt(1)); + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName); + assertTrue(rs.next()); + assertEquals(numRows, rs.getInt(1)); + assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName, + IndexRegionObserver.VERIFIED_BYTES)); + + // Now try to fail Phase1 and observe that index state is not DISABLED + try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) { + admin.disableTable(TableName.valueOf(fullIndexName)); + boolean isWriteOnDisabledIndexFailed = false; + try { + upsertRows(conn, fullTableName, numRows); + } catch (SQLException ex) { + isWriteOnDisabledIndexFailed = true; + } + assertEquals(true, isWriteOnDisabledIndexFailed); + PIndexState indexState = TestUtil.getIndexState(conn, fullIndexName); + assertEquals(PIndexState.ACTIVE, indexState); + + } + } + } + } + + @Test + public void testGlobalImmutableIndexDelete() throws Exception { + if (localIndex || transactionProvider != null) { + return; + } + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String tableName = "TBL_" + generateUniqueName(); + String indexName = "IND_" + generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + TABLE_NAME = fullTableName; + try (Connection conn = DriverManager.getConnection(getUrl(), props); + Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) { + conn.setAutoCommit(true); + int numRows = 2; + createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName, numRows, null); + + String dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar1'"; + conn.createStatement().execute(dml); + conn.commit(); + ResultSet rs; + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME); + assertTrue(rs.next()); + assertEquals(numRows - 1, rs.getInt(1)); + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName); + assertTrue(rs.next()); + assertEquals(numRows - 1, rs.getInt(1)); + + // Force delete to fail (data removed but operation failed) on data table and check index table row remains as unverified + TestUtil.addCoprocessor(conn, fullTableName, DeleteFailingRegionObserver.class); + dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar2'"; + boolean isDeleteFailed = false; + try { + conn.createStatement().execute(dml); + } catch (Exception ex) { + isDeleteFailed = true; + } + assertEquals(true, isDeleteFailed); + TestUtil.removeCoprocessor(conn, fullTableName, DeleteFailingRegionObserver.class); + assertEquals(numRows - 1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(Bytes.toBytes(fullIndexName)), false)); + assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.UNVERIFIED_BYTES)); + + // Now delete via hbase, read from unverified index and see that we don't get any data + admin.disableTable(TableName.valueOf(fullTableName)); + admin.truncateTable(TableName.valueOf(fullTableName), true); + String selectFromIndex = "SELECT long_pk, varchar_pk, long_col1 FROM " + TABLE_NAME + " WHERE varchar_pk='varchar2' AND long_pk=2"; + rs = + conn.createStatement().executeQuery( + "EXPLAIN " + selectFromIndex); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + assertExplainPlan(false, actualExplainPlan, fullTableName, fullIndexName); + + rs = conn.createStatement().executeQuery(selectFromIndex); + assertFalse(rs.next()); + } + } + + public static class DeleteFailingRegionObserver extends SimpleRegionObserver { + @Override + public void preBatchMutate(ObserverContext c, MiniBatchOperationInProgress miniBatchOp) throws + IOException { + throw new DoNotRetryIOException(); + } + } + + public static boolean verifyRowsForEmptyColValue(Connection conn, String tableName, byte[] valueBytes) + throws IOException, SQLException { + PTable table = PhoenixRuntime.getTable(conn, tableName); + byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table); + byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); + HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(table.getPhysicalName().getBytes()); + Scan scan = new Scan(); + scan.addColumn(emptyCF, emptyCQ); + ResultScanner resultScanner = htable.getScanner(scan); + + for (Result result = resultScanner.next(); result != null; result = resultScanner.next()) { + if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, valueBytes.length, + valueBytes, 0, valueBytes.length) != 0) { + return false; + } + } + return true; + } + // This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back. @Ignore diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 2769ca12697..2c909a95642 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -32,6 +32,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -42,7 +43,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -55,6 +58,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; @@ -94,10 +98,13 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.transaction.TransactionFactory.Provider; +import org.apache.phoenix.util.EncodedColumnsUtil; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.apache.phoenix.util.SQLCloseable; +import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.TransactionUtil; @@ -896,13 +903,20 @@ private void send(Iterator tableRefIterator) throws SQLException { continue; } // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) - long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, - multiRowMutationState) : serverTimeStamps[i++]; - Long scn = connection.getSCN(); - long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; + long + serverTimestamp = + serverTimeStamps == null ? + validateAndGetServerTimestamp(tableRef, multiRowMutationState) : + serverTimeStamps[i++]; final PTable table = tableRef.getTable(); - Iterator>> mutationsIterator = addRowMutations(tableRef, - multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll); + Long scn = connection.getSCN(); + long mutationTimestamp = scn == null ? + (table.isTransactional() == true ? HConstants.LATEST_TIMESTAMP : EnvironmentEdgeManager.currentTimeMillis()) + : scn; + Iterator>> + mutationsIterator = + addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, + serverTimestamp, false, sendAll); // build map from physical table to mutation list boolean isDataTable = true; while (mutationsIterator.hasNext()) { @@ -910,7 +924,9 @@ private void send(Iterator tableRefIterator) throws SQLException { PName hTableName = pair.getFirst(); List mutationList = pair.getSecond(); TableInfo tableInfo = new TableInfo(isDataTable, hTableName, tableRef); - List oldMutationList = physicalTableMutationMap.put(tableInfo, mutationList); + List + oldMutationList = + physicalTableMutationMap.put(tableInfo, mutationList); if (oldMutationList != null) mutationList.addAll(0, oldMutationList); isDataTable = false; } @@ -931,220 +947,300 @@ private void send(Iterator tableRefIterator) throws SQLException { joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations); } } - long serverTimestamp = HConstants.LATEST_TIMESTAMP; - Iterator>> mutationsIterator = physicalTableMutationMap.entrySet() - .iterator(); - while (mutationsIterator.hasNext()) { - Entry> pair = mutationsIterator.next(); - TableInfo tableInfo = pair.getKey(); - byte[] htableName = tableInfo.getHTableName().getBytes(); - List mutationList = pair.getValue(); - List> mutationBatchList = - getMutationBatchList(batchSize, batchSizeBytes, mutationList); - - // create a span per target table - // TODO maybe we can be smarter about the table name to string here? - Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName)); - - int retryCount = 0; - boolean shouldRetry = false; - long numMutations = 0; - long mutationSizeBytes = 0; - long mutationCommitTime = 0; - long numFailedMutations = 0; - ; - long startTime = 0; - boolean shouldRetryIndexedMutation = false; - IndexWriteException iwe = null; - do { - TableRef origTableRef = tableInfo.getOrigTableRef(); - PTable table = origTableRef.getTable(); - table.getIndexMaintainers(indexMetaDataPtr, connection); - final ServerCache cache = tableInfo.isDataTable() ? - IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table, - mutationList, indexMetaDataPtr) : null; - // If we haven't retried yet, retry for this case only, as it's possible that - // a split will occur after we send the index metadata cache to all known - // region servers. - shouldRetry = cache != null; - SQLException sqlE = null; - Table hTable = connection.getQueryServices().getTable(htableName); - try { - if (table.isTransactional()) { - // Track tables to which we've sent uncommitted data - if (tableInfo.isDataTable()) { - uncommittedPhysicalNames.add(table.getPhysicalName().getString()); - phoenixTransactionContext.markDMLFence(table); - } - // Only pass true for last argument if the index is being written to on it's own (i.e. initial - // index population), not if it's being written to for normal maintenance due to writes to - // the data table. This case is different because the initial index population does not need - // to be done transactionally since the index is only made active after all writes have - // occurred successfully. - hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX); + + Map> unverifiedIndexMutations = new LinkedHashMap<>(); + Map> verifiedOrDeletedIndexMutations = new LinkedHashMap<>(); + filterIndexCheckerMutations(physicalTableMutationMap, unverifiedIndexMutations, + verifiedOrDeletedIndexMutations); + + // Phase 1: Send index mutations with the empty column value = "unverified" + sendMutations(unverifiedIndexMutations.entrySet().iterator(), span, indexMetaDataPtr); + + // Phase 2: Send data table and other indexes + sendMutations(physicalTableMutationMap.entrySet().iterator(), span, indexMetaDataPtr); + + // Phase 3: Send put index mutations with the empty column value = "verified" and/or delete index mutations + try { + sendMutations(verifiedOrDeletedIndexMutations.entrySet().iterator(), span, indexMetaDataPtr); + } catch (SQLException ex) { + // TODO: add a metric here + LOGGER.warn( + "Ignoring exception that happened during setting index verified value to verified=TRUE " + + verifiedOrDeletedIndexMutations.toString(), + ex); + } + + } + } + + private void sendMutations(Iterator>> mutationsIterator, Span span, ImmutableBytesWritable indexMetaDataPtr) + throws SQLException { + while (mutationsIterator.hasNext()) { + Entry> pair = mutationsIterator.next(); + TableInfo tableInfo = pair.getKey(); + byte[] htableName = tableInfo.getHTableName().getBytes(); + List mutationList = pair.getValue(); + List> mutationBatchList = + getMutationBatchList(batchSize, batchSizeBytes, mutationList); + + // create a span per target table + // TODO maybe we can be smarter about the table name to string here? + Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName)); + + int retryCount = 0; + boolean shouldRetry = false; + long numMutations = 0; + long mutationSizeBytes = 0; + long mutationCommitTime = 0; + long numFailedMutations = 0; + + long startTime = 0; + boolean shouldRetryIndexedMutation = false; + IndexWriteException iwe = null; + do { + TableRef origTableRef = tableInfo.getOrigTableRef(); + PTable table = origTableRef.getTable(); + table.getIndexMaintainers(indexMetaDataPtr, connection); + final ServerCache cache = tableInfo.isDataTable() ? + IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table, + mutationList, indexMetaDataPtr) : null; + // If we haven't retried yet, retry for this case only, as it's possible that + // a split will occur after we send the index metadata cache to all known + // region servers. + shouldRetry = cache != null; + SQLException sqlE = null; + Table hTable = connection.getQueryServices().getTable(htableName); + try { + if (table.isTransactional()) { + // Track tables to which we've sent uncommitted data + if (tableInfo.isDataTable()) { + uncommittedPhysicalNames.add(table.getPhysicalName().getString()); + phoenixTransactionContext.markDMLFence(table); } - numMutations = mutationList.size(); - GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); - mutationSizeBytes = calculateMutationSize(mutationList); - - startTime = System.currentTimeMillis(); - child.addTimelineAnnotation("Attempt " + retryCount); - Iterator> itrListMutation = mutationBatchList.iterator(); - while (itrListMutation.hasNext()) { - final List mutationBatch = itrListMutation.next(); - if (shouldRetryIndexedMutation) { - // if there was an index write failure, retry the mutation in a loop - final Table finalHTable = hTable; - final ImmutableBytesWritable finalindexMetaDataPtr = - indexMetaDataPtr; - final PTable finalPTable = table; - PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() { - @Override - public void doMutation() throws IOException { - try { - finalHTable.batch(mutationBatch, null); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } catch (IOException e) { - e = updateTableRegionCacheIfNecessary(e); - throw e; - } + // Only pass true for last argument if the index is being written to on it's own (i.e. initial + // index population), not if it's being written to for normal maintenance due to writes to + // the data table. This case is different because the initial index population does not need + // to be done transactionally since the index is only made active after all writes have + // occurred successfully. + hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX); + } + numMutations = mutationList.size(); + GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); + mutationSizeBytes = calculateMutationSize(mutationList); + + startTime = System.currentTimeMillis(); + child.addTimelineAnnotation("Attempt " + retryCount); + Iterator> itrListMutation = mutationBatchList.iterator(); + while (itrListMutation.hasNext()) { + final List mutationBatch = itrListMutation.next(); + if (shouldRetryIndexedMutation) { + // if there was an index write failure, retry the mutation in a loop + final Table finalHTable = hTable; + final ImmutableBytesWritable finalindexMetaDataPtr = + indexMetaDataPtr; + final PTable finalPTable = table; + PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() { + @Override + public void doMutation() throws IOException { + try { + finalHTable.batch(mutationBatch, null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } catch (IOException e) { + e = updateTableRegionCacheIfNecessary(e); + throw e; } + } - @Override - public List getMutationList() { - return mutationBatch; - } + @Override + public List getMutationList() { + return mutationBatch; + } - private IOException - updateTableRegionCacheIfNecessary(IOException ioe) { - SQLException sqlE = - ServerUtil.parseLocalOrRemoteServerException(ioe); - if (sqlE != null - && sqlE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND - .getErrorCode()) { - try { - connection.getQueryServices().clearTableRegionCache( + private IOException + updateTableRegionCacheIfNecessary(IOException ioe) { + SQLException sqlE = + ServerUtil.parseLocalOrRemoteServerException(ioe); + if (sqlE != null + && sqlE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND + .getErrorCode()) { + try { + connection.getQueryServices().clearTableRegionCache( finalHTable.getName()); - IndexMetaDataCacheClient.setMetaDataOnMutations( + IndexMetaDataCacheClient.setMetaDataOnMutations( connection, finalPTable, mutationBatch, finalindexMetaDataPtr); - } catch (SQLException e) { - return ServerUtil.createIOException( + } catch (SQLException e) { + return ServerUtil.createIOException( "Exception during updating index meta data cache", ioe); - } } - return ioe; } - }, iwe, connection, connection.getQueryServices().getProps()); - shouldRetryIndexedMutation = false; - } else { - hTable.batch(mutationBatch, null); - } - // remove each batch from the list once it gets applied - // so when failures happens for any batch we only start - // from that batch only instead of doing duplicate reply of already - // applied batches from entire list, also we can set - // REPLAY_ONLY_INDEX_WRITES for first batch - // only in case of 1121 SQLException - itrListMutation.remove(); - - batchCount++; - if (LOGGER.isDebugEnabled()) - LOGGER.debug("Sent batch of " + mutationBatch.size() + " for " - + Bytes.toString(htableName)); - } - child.stop(); - child.stop(); - shouldRetry = false; - mutationCommitTime = System.currentTimeMillis() - startTime; - GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); - numFailedMutations = 0; - - // Remove batches as we process them - mutations.remove(origTableRef); - if (tableInfo.isDataTable()) { - numRows -= numMutations; - // recalculate the estimated size - estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(mutations); + return ioe; + } + }, iwe, connection, connection.getQueryServices().getProps()); + shouldRetryIndexedMutation = false; + } else { + hTable.batch(mutationBatch, null); } - } catch (Exception e) { - mutationCommitTime = System.currentTimeMillis() - startTime; - serverTimestamp = ServerUtil.parseServerTimestamp(e); - SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); - if (inferredE != null) { - if (shouldRetry - && retryCount == 0 - && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND - .getErrorCode()) { - // Swallow this exception once, as it's possible that we split after sending the index - // metadata - // and one of the region servers doesn't have it. This will cause it to have it the next - // go around. - // If it fails again, we don't retry. - String msg = "Swallowing exception and retrying after clearing meta cache on connection. " - + inferredE; - LOGGER.warn(LogUtil.addCustomAnnotations(msg, connection)); - connection.getQueryServices().clearTableRegionCache(TableName.valueOf(htableName)); - - // add a new child span as this one failed - child.addTimelineAnnotation(msg); - child.stop(); - child = Tracing.child(span, "Failed batch, attempting retry"); - - continue; - } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { - iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE); - if (iwe != null && !shouldRetryIndexedMutation) { - // For an index write failure, the data table write succeeded, - // so when we retry we need to set REPLAY_WRITES - // for first batch in list only. - for (Mutation m : mutationBatchList.get(0)) { - if (!PhoenixIndexMetaData.isIndexRebuild( + // remove each batch from the list once it gets applied + // so when failures happens for any batch we only start + // from that batch only instead of doing duplicate reply of already + // applied batches from entire list, also we can set + // REPLAY_ONLY_INDEX_WRITES for first batch + // only in case of 1121 SQLException + itrListMutation.remove(); + + batchCount++; + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Sent batch of " + mutationBatch.size() + " for " + + Bytes.toString(htableName)); + } + child.stop(); + child.stop(); + shouldRetry = false; + mutationCommitTime = System.currentTimeMillis() - startTime; + GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); + numFailedMutations = 0; + + // Remove batches as we process them + mutations.remove(origTableRef); + if (tableInfo.isDataTable()) { + numRows -= numMutations; + // recalculate the estimated size + estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(mutations); + } + } catch (Exception e) { + mutationCommitTime = System.currentTimeMillis() - startTime; + long serverTimestamp = ServerUtil.parseServerTimestamp(e); + SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); + if (inferredE != null) { + if (shouldRetry + && retryCount == 0 + && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND + .getErrorCode()) { + // Swallow this exception once, as it's possible that we split after sending the index + // metadata + // and one of the region servers doesn't have it. This will cause it to have it the next + // go around. + // If it fails again, we don't retry. + String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + + inferredE; + LOGGER.warn(LogUtil.addCustomAnnotations(msg, connection)); + connection.getQueryServices().clearTableRegionCache(TableName.valueOf(htableName)); + + // add a new child span as this one failed + child.addTimelineAnnotation(msg); + child.stop(); + child = Tracing.child(span, "Failed batch, attempting retry"); + + continue; + } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) { + iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE); + if (iwe != null && !shouldRetryIndexedMutation) { + // For an index write failure, the data table write succeeded, + // so when we retry we need to set REPLAY_WRITES + // for first batch in list only. + for (Mutation m : mutationBatchList.get(0)) { + if (!PhoenixIndexMetaData.isIndexRebuild( m.getAttributesMap())){ - m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, + m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES - ); - } - PhoenixKeyValueUtil.setTimestamp(m, serverTimestamp); + ); } - shouldRetry = true; - shouldRetryIndexedMutation = true; - continue; + PhoenixKeyValueUtil.setTimestamp(m, serverTimestamp); } + shouldRetry = true; + shouldRetryIndexedMutation = true; + continue; } - e = inferredE; } - // Throw to client an exception that indicates the statements that - // were not committed successfully. - int[] uncommittedStatementIndexes = getUncommittedStatementIndexes(); - sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp); - numFailedMutations = uncommittedStatementIndexes.length; - GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations); + e = inferredE; + } + // Throw to client an exception that indicates the statements that + // were not committed successfully. + int[] uncommittedStatementIndexes = getUncommittedStatementIndexes(); + sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp); + numFailedMutations = uncommittedStatementIndexes.length; + GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations); + } finally { + MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, + mutationCommitTime, numFailedMutations); + mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); + try { + if (cache != null) cache.close(); } finally { - MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, - mutationCommitTime, numFailedMutations); - mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); try { - if (cache != null) cache.close(); - } finally { - try { - hTable.close(); - } catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); - } + hTable.close(); + } catch (IOException e) { + if (sqlE != null) { + sqlE.setNextException(ServerUtil.parseServerException(e)); + } else { + sqlE = ServerUtil.parseServerException(e); } - if (sqlE != null) { throw sqlE; } } + if (sqlE != null) { throw sqlE; } } - } while (shouldRetry && retryCount++ < 1); + } + } while (shouldRetry && retryCount++ < 1); + } + } + + private void filterIndexCheckerMutations(Map> mutationMap, + Map> unverifiedIndexMutations, + Map> verifiedOrDeletedIndexMutations) throws SQLException { + Iterator>> mapIter = mutationMap.entrySet().iterator(); + + while (mapIter.hasNext()) { + Entry> pair = mapIter.next(); + TableInfo tableInfo = pair.getKey(); + if (IndexUtil.isGlobalIndexCheckerEnabled(connection, tableInfo.getHTableName())) { + PTable table = tableInfo.getOrigTableRef().getTable(); + byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table); + byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); + + List mutations = pair.getValue(); + + for (Mutation m : mutations) { + if (m == null) { + continue; + } + if (m instanceof Delete) { + Put put = new Put(m.getRow()); + put.addColumn(emptyCF, emptyCQ, IndexRegionObserver.getMaxTimestamp(m), + IndexRegionObserver.UNVERIFIED_BYTES); + // The Delete gets marked as unverified in Phase 1 and gets deleted on Phase 3. + addToMap(unverifiedIndexMutations, tableInfo, put); + addToMap(verifiedOrDeletedIndexMutations, tableInfo, m); + } else { + // These mutations already have Unverified_Bytes set. + long timestamp = IndexRegionObserver.getMaxTimestamp(m); + ((Put)m).addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.UNVERIFIED_BYTES); + addToMap(unverifiedIndexMutations, tableInfo, m); + // Set verified bytes for Phase 3. + Put verifiedPut = new Put(m.getRow()); + verifiedPut.addColumn(emptyCF, emptyCQ, timestamp, + IndexRegionObserver.VERIFIED_BYTES); + addToMap(verifiedOrDeletedIndexMutations, tableInfo, verifiedPut); + } + } + + mapIter.remove(); } + + } + } + + private void addToMap(Map> map, TableInfo tableInfo, Mutation mutation) { + List mutations = null; + if (map.containsKey(tableInfo)) { + mutations = map.get(tableInfo); + } else { + mutations = Lists.newArrayList(); } + mutations.add(mutation); + map.put(tableInfo, mutations); } /** @@ -1269,7 +1365,7 @@ public void commit() throws SQLException { LOGGER.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount); retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION - .getErrorCode() && retryCount < MAX_COMMIT_RETRIES); + .getErrorCode() && retryCount < MAX_COMMIT_RETRIES); if (sqlE == null) { sqlE = e; } else { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index e641503fabf..91954636d60 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -382,7 +382,7 @@ public void preBatchMutate(ObserverContext c, "Somehow didn't return an index update but also didn't propagate the failure to the client!"); } - private long getMaxTimestamp(Mutation m) { + public static long getMaxTimestamp(Mutation m) { long maxTs = 0; long ts = 0; Iterator iterator = m.getFamilyCellMap().entrySet().iterator(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index b860f69d7a4..a9248894658 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -974,10 +974,11 @@ public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGette put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), - this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts, + this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts, QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR)); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { // map from index column family to list of pair of index column and data column (for covered columns) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 133737f8771..a45448fcf46 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -913,7 +913,8 @@ private void addCoprocessors(byte[] tableName, TableDescriptorBuilder builder, P if (tableType == PTableType.INDEX && !isTransactional) { if (!indexRegionObserverEnabled && newDesc.hasCoprocessor(GlobalIndexChecker.class.getName())) { builder.removeCoprocessor(GlobalIndexChecker.class.getName()); - } else if (indexRegionObserverEnabled && !newDesc.hasCoprocessor(GlobalIndexChecker.class.getName())) { + } else if (indexRegionObserverEnabled && !newDesc.hasCoprocessor(GlobalIndexChecker.class.getName()) && + !isLocalIndexTable(newDesc.getColumnFamilyNames())) { builder.addCoprocessor(GlobalIndexChecker.class.getName(), null, priority - 1, null); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 65189fc1ff6..20cc86e6f62 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -193,6 +193,7 @@ public class QueryServicesOptions { public static final long DEFAULT_GROUPBY_MAX_CACHE_MAX = 1024L*1024L*100L; // 100 Mb public static final long DEFAULT_SEQUENCE_CACHE_SIZE = 100; // reserve 100 sequences at a time + public static final int GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN = 10; public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS = 60000 * 30; // 30 mins public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE = 1024L*1024L*20L; // 20 Mb public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE = 1024L*1024L*10L; // 10 Mb diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 64650b26bf1..fe965b57111 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -40,7 +40,11 @@ import java.util.ListIterator; import java.util.Map; import java.util.NavigableSet; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; @@ -95,6 +99,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -105,6 +110,7 @@ import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; @@ -137,7 +143,12 @@ public class IndexUtil { public static final String INDEX_COLUMN_NAME_SEP = ":"; public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP); - + + private static Cache IndexNameGlobalIndexCheckerEnabledMap = CacheBuilder.newBuilder() + .expireAfterWrite(QueryServicesOptions.GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN, + TimeUnit.MINUTES) + .build(); + private IndexUtil() { } @@ -294,11 +305,37 @@ private static boolean isEmptyKeyValue(PTable table, ColumnReference ref) { .getLength()) == 0); } + + public static boolean isGlobalIndexCheckerEnabled(PhoenixConnection connection, PName index) + throws SQLException { + String indexName = index.getString(); + Boolean entry = IndexNameGlobalIndexCheckerEnabledMap.getIfPresent(indexName); + if (entry != null){ + return entry; + } + + boolean result = false; + try { + TableDescriptor desc = connection.getQueryServices().getTableDescriptor(index.getBytes()); + + if (desc != null) { + if (desc.hasCoprocessor(GlobalIndexChecker.class.getName())) { + result = true; + } + } + IndexNameGlobalIndexCheckerEnabledMap.put(indexName, result); + } catch (TableNotFoundException ex) { + // We can swallow this because some indexes don't have separate tables like local indexes + } + + return result; + } + public static List generateIndexData(final PTable table, PTable index, final MultiRowMutationState multiRowMutationState, List dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection) throws SQLException { try { - final ImmutableBytesPtr ptr = new ImmutableBytesPtr(); + final ImmutableBytesPtr ptr = new ImmutableBytesPtr(); IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); List indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size()); for (final Mutation dataMutation : dataMutations) { @@ -312,12 +349,12 @@ public static List generateIndexData(final PTable table, PTable index, */ if (dataMutation instanceof Put) { ValueGetter valueGetter = new ValueGetter() { - - @Override + + @Override public byte[] getRowKey() { - return dataMutation.getRow(); - } - + return dataMutation.getRow(); + } + @Override public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { // Always return null for our empty key value, as this will cause the index @@ -334,15 +371,15 @@ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { } for (Cell kv : kvs) { if (Bytes.compareTo(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), family, 0, family.length) == 0 && - Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), qualifier, 0, qualifier.length) == 0) { - ImmutableBytesPtr ptr = new ImmutableBytesPtr(); - kvBuilder.getValueAsPtr(kv, ptr); - return ptr; + Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), qualifier, 0, qualifier.length) == 0) { + ImmutableBytesPtr ptr = new ImmutableBytesPtr(); + kvBuilder.getValueAsPtr(kv, ptr); + return ptr; } } return null; } - + }; byte[] regionStartKey = null; byte[] regionEndkey = null; @@ -906,7 +943,7 @@ else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) { } public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException { - if (table.isTransactional() || table.isImmutableRows() || table.getType() != PTableType.INDEX) { + if (table.isTransactional() || table.getType() != PTableType.INDEX) { return; } PTable indexTable = table; diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 90e3158af4e..fef745778f2 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -51,7 +51,6 @@ import java.util.Iterator; import java.util.List; import java.util.Properties; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -73,7 +72,6 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.phoenix.compile.AggregationManager; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.SequenceManager; @@ -877,8 +875,12 @@ public static void dumpTable(Table table) throws IOException { } public static int getRawRowCount(Table table) throws IOException { + return getRowCount(table, true); + } + + public static int getRowCount(Table table, boolean isRaw) throws IOException { Scan s = new Scan(); - s.setRaw(true);; + s.setRaw(isRaw);; s.setMaxVersions(); int rows = 0; try (ResultScanner scanner = table.getScanner(s)) { @@ -939,6 +941,7 @@ public IndexStateCheck(PIndexState indexState, Long indexDisableTimestamp, Boole public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException { int maxTries = 60, nTries = 0; + PIndexState actualIndexState = null; do { String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName); String index = SchemaUtil.getTableNameFromFullName(fullIndexName); @@ -948,7 +951,6 @@ public static void waitForIndexState(Connection conn, String fullIndexName, PInd + ") = (" + "'" + schema + "','" + index + "') " + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL"; ResultSet rs = conn.createStatement().executeQuery(query); - PIndexState actualIndexState = null; if (rs.next()) { actualIndexState = PIndexState.fromSerializedValue(rs.getString(1)); boolean matchesExpected = (actualIndexState == expectedIndexState); @@ -957,7 +959,8 @@ public static void waitForIndexState(Connection conn, String fullIndexName, PInd } } } while (++nTries < maxTries); - fail("Ran out of time waiting for index state to become " + expectedIndexState); + fail("Ran out of time waiting for index state to become " + expectedIndexState + " last seen actual state is " + + (actualIndexState == null ? "Unknown" : actualIndexState.toString())); } public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws InterruptedException, SQLException {