diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java index f023a745044..2843c11e917 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.end2end; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -27,27 +30,35 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.ScanInfoUtil; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.phoenix.coprocessor.IndexRepairRegionScanner; +import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.index.IndexMaintainer; -import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.mapreduce.index.IndexTool.IndexDisableLoggingType; +import org.apache.phoenix.mapreduce.index.IndexTool.IndexVerifyType; import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; +import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow; import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexScrutiny; +import org.apache.phoenix.util.ManualEnvironmentEdge; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -59,15 +70,27 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Properties; import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES; -import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +@Category(NeedsOwnMiniClusterTest.class) @RunWith(Parameterized.class) public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT { @@ -93,12 +116,22 @@ public static synchronized Collection data() { {false} }); } + @BeforeClass + public static synchronized void doSetup() throws Exception { + // below settings are needed to enforce major compaction + Map props = Maps.newHashMapWithExpectedSize(2); + props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(0)); + props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + @Before public void createIndexToolTables() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { IndexTool.createIndexToolTables(conn); } + resetIndexRegionObserverFailPoints(); } @After @@ -111,29 +144,7 @@ public void cleanup() throws Exception { TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME)); } EnvironmentEdgeManager.reset(); - } - - private void repairIndex(Connection conn, String schemaName, String dataTableFullName, String indexTableName, IndexTool.IndexVerifyType verifyType) throws Exception { - PTable pDataTable = PhoenixRuntime.getTable(conn, dataTableFullName); - PTable pIndexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indexTableName)); - Table hTable = conn.unwrap(PhoenixConnection.class).getQueryServices() - .getTable(pIndexTable.getPhysicalName().getBytes()); - Scan scan = new Scan(); - scan.setRaw(true); - scan.setMaxVersions(); - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class); - IndexMaintainer.serialize(pDataTable, ptr, Collections.singletonList(pIndexTable), phoenixConnection); - scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE, verifyType.toBytes()); - scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, pDataTable.getPhysicalName().getBytes()); - scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); - scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, TRUE_BYTES); - scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD)); - scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); - scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true)); - ResultScanner scanner = hTable.getScanner(scan); - for (Result result = scanner.next(); result != null; result = scanner.next()) { - } + resetIndexRegionObserverFailPoints(); } private void setIndexRowStatusesToVerified(Connection conn, String dataTableFullName, String indexTableFullName) throws Exception { @@ -154,6 +165,112 @@ private void setIndexRowStatusesToVerified(Connection conn, String dataTableFull } } + private void initTablesAndAddExtraRowsToIndex(Connection conn, String schemaName, String dataTableName, + String indexTableName, int NROWS) throws Exception { + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions); + PreparedStatement dataPreparedStatement = + conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"); + for (int i = 1; i <= NROWS; i++) { + dataPreparedStatement.setInt(1, i); + dataPreparedStatement.setInt(2, i + 1); + dataPreparedStatement.setInt(3, i * 2); + dataPreparedStatement.execute(); + } + conn.commit(); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName)); + + // Add extra index rows + PreparedStatement indexPreparedStatement = + conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)"); + + for (int i = NROWS + 1; i <= 2 * NROWS; i++) { + indexPreparedStatement.setInt(1, i + 1); // the indexed column + indexPreparedStatement.setInt(2, i); // the data pk column + indexPreparedStatement.setInt(3, i * 2); // the included column + indexPreparedStatement.execute(); + } + conn.commit(); + + // Set all index row statuses to verified so that read verify will not fix them. We want them to be fixed + // by IndexRepairRegionScanner + setIndexRowStatusesToVerified(conn, dataTableFullName, indexTableFullName); + } + + private void truncateIndexToolTables() throws IOException { + getUtility().getHBaseAdmin().disableTable(TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); + getUtility().getHBaseAdmin().truncateTable(TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME), true); + getUtility().getHBaseAdmin().disableTable(TableName.valueOf(RESULT_TABLE_NAME)); + getUtility().getHBaseAdmin().truncateTable(TableName.valueOf(RESULT_TABLE_NAME), true); + } + + private void assertExtraCounters(IndexTool indexTool, long extraVerified, long extraUnverified, + boolean isBefore) throws IOException { + CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool); + + if (isBefore) { + assertEquals(extraVerified, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(extraUnverified, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + } else { + assertEquals(extraVerified, + mrJobCounters.findCounter(AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(extraUnverified, + mrJobCounters.findCounter(AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + } + } + + private void assertDisableLogging(Connection conn, int expectedRows, + IndexTool.IndexVerifyType verifyType, + IndexTool.IndexDisableLoggingType disableLoggingType, + byte[] expectedPhase, + String schemaName, String dataTableName, + String indexTableName, String indexTableFullName, + int expectedStatus) throws Exception { + + IndexTool tool = IndexToolIT.runIndexTool(getUtility().getConfiguration(), true, false, schemaName, dataTableName, + indexTableName, + null, + expectedStatus, verifyType, disableLoggingType, "-fi"); + assertNotNull(tool); + byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName); + + IndexVerificationOutputRepository outputRepository = + new IndexVerificationOutputRepository(indexTableFullNameBytes, conn); + List rows = + outputRepository.getAllOutputRows(); + try { + assertEquals(expectedRows, rows.size()); + } catch (AssertionError e) { + TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); + throw e; + } + if (expectedRows > 0) { + assertArrayEquals(expectedPhase, rows.get(0).getPhaseValue()); + } + } + + static private void resetIndexRegionObserverFailPoints() { + IndexRegionObserver.setFailPreIndexUpdatesForTesting(false); + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + } + + static private void commitWithException(Connection conn) { + try { + conn.commit(); + resetIndexRegionObserverFailPoints(); + fail(); + } catch (Exception e) { + // this is expected + } + } + @Test public void testRepairExtraIndexRows() throws Exception { final int NROWS = 20; @@ -163,12 +280,55 @@ public void testRepairExtraIndexRows() throws Exception { String indexTableName = generateUniqueName(); String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS); + + // do index rebuild without -fi and check with scrutiny that index tool failed to fix the extra rows + IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.BEFORE); + + boolean failed; + try { + IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + failed = false; + } catch (AssertionError e) { + failed = true; + } + assertTrue(failed); + + // now repair the index with -fi + IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi"); + + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS, actualRowCount); + + assertExtraCounters(indexTool, NROWS, 0, true); + } + } + + @Test + public void testRepairExtraIndexRows_PostIndexUpdateFailure_overwrite() throws Exception { + if (!mutable) { + return; + } + final int NROWS = 4; + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.createStatement().execute("CREATE TABLE " + dataTableFullName - + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " - + tableDDLOptions); + + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName)); + PreparedStatement dataPreparedStatement = - conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"); + conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"); for (int i = 1; i <= NROWS; i++) { dataPreparedStatement.setInt(1, i); dataPreparedStatement.setInt(2, i + 1); @@ -176,22 +336,191 @@ public void testRepairExtraIndexRows() throws Exception { dataPreparedStatement.execute(); } conn.commit(); + + IndexRegionObserver.setFailPostIndexUpdatesForTesting(true); + conn.createStatement().execute("UPSERT INTO " + dataTableFullName + " VALUES(3, 100, 200)"); + conn.commit(); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + + IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi"); + + CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool); + assertEquals(2, + mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(2, + mrJobCounters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + + indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.ONLY, "-fi"); + mrJobCounters = IndexToolIT.getMRJobCounters(indexTool); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS, actualRowCount); + } + } + + @Test + public void testRepairExtraIndexRows_PostIndexUpdateFailure_delete() throws Exception { + if (!mutable) { + return; + } + final int NROWS = 4; + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions); conn.createStatement().execute(String.format( - "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName)); - // Add extra index rows - PreparedStatement indexPreparedStatement = - conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)"); + "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName)); - for (int i = NROWS + 1; i <= 2 * NROWS; i++) { - indexPreparedStatement.setInt(1, i + 1); // the indexed column - indexPreparedStatement.setInt(2, i); // the data pk column - indexPreparedStatement.setInt(3, i * 2); // the included column - indexPreparedStatement.execute(); + PreparedStatement dataPreparedStatement = + conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"); + for (int i = 1; i <= NROWS; i++) { + dataPreparedStatement.setInt(1, i); + dataPreparedStatement.setInt(2, i + 1); + dataPreparedStatement.setInt(3, i * 2); + dataPreparedStatement.execute(); } conn.commit(); - // Set all index row statuses to verified so that read verify will not fix them. We want them to be fixed - // by IndexRepairRegionScanner - setIndexRowStatusesToVerified(conn, dataTableFullName, indexTableFullName); + + IndexRegionObserver.setFailPostIndexUpdatesForTesting(true); + conn.createStatement().execute("DELETE FROM " + dataTableFullName + " WHERE ID = 3"); + conn.commit(); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + TestUtil.doMajorCompaction(conn, dataTableFullName); + + IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi"); + + CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool); + + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(1, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + + indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.ONLY, "-fi"); + mrJobCounters = IndexToolIT.getMRJobCounters(indexTool); + + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS - 1, actualRowCount); + } + } + + @Test + public void testRepairExtraIndexRows_DataTableUpdateFailure() throws Exception { + if (!mutable) { + return; + } + final int NROWS = 20; + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName)); + + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + + PreparedStatement dataPreparedStatement = + conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"); + for (int i = 1; i <= NROWS; i++) { + dataPreparedStatement.setInt(1, i); + dataPreparedStatement.setInt(2, i + 1); + dataPreparedStatement.setInt(3, i * 2); + dataPreparedStatement.execute(); + } + commitWithException(conn); + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + + IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi"); + + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(0, actualRowCount); + + assertExtraCounters(indexTool, 0, NROWS, true); + } + } + + @Test + public void testPITRow() throws Exception { + final int NROWS = 1; + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS); + + IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.ONLY, "-fi"); + + Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName); + String expectedErrorMsg = IndexRepairRegionScanner.ERROR_MESSAGE_EXTRA_INDEX_ROW; + String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + assertTrue(actualErrorMsg.contains(expectedErrorMsg)); + } + } + + @Test + public void testVerifyAfterExtraIndexRows() throws Exception { + final int NROWS = 20; + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS); + + // Run -v AFTER and check it doesn't fix the extra rows and the job fails + IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, -1, IndexVerifyType.AFTER, "-fi"); + boolean failed; try { IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); @@ -200,11 +529,275 @@ public void testRepairExtraIndexRows() throws Exception { failed = true; } assertTrue(failed); - // Repair the index - repairIndex(conn, schemaName, dataTableFullName, indexTableName, IndexTool.IndexVerifyType.BEFORE); - long actualRowCount = IndexScrutiny - .scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + + // job failed so no counters are output + } + } + + @Test + public void testVerifyBothExtraIndexRows() throws Exception { + final int NROWS = 20; + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS); + + IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.BOTH, "-fi"); + + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); + + assertExtraCounters(indexTool, 0, 0, false); + } + } + + @Test + public void testOverrideIndexRebuildPageSizeFromIndexTool() throws Exception { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + final int NROWS = 20; + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS); + + Configuration conf = new Configuration(getUtility().getConfiguration()); + conf.set(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(2)); + IndexTool indexTool = IndexToolIT.runIndexTool(conf,false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.BEFORE, IndexDisableLoggingType.NONE,"-fi"); + + long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(NROWS, actualRowCount); + + assertExtraCounters(indexTool, NROWS, 0, true); + } + } + + @Test + public void testViewIndexExtraRows() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String viewName = generateUniqueName(); + String viewFullName = SchemaUtil.getTableName(schemaName, viewName); + String indexTableName1 = generateUniqueName(); + String indexTableFullName1 = SchemaUtil.getTableName(schemaName, indexTableName1); + String indexTableName2 = generateUniqueName(); + String indexTableFullName2 = SchemaUtil.getTableName(schemaName, indexTableName2); + + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + + tableDDLOptions); + conn.commit(); + conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName); + conn.commit(); + // Insert a row + conn.createStatement().execute("UPSERT INTO " + viewFullName + " values (1, 2, 4)"); + conn.commit(); + + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName1, viewFullName)); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (VAL2) INCLUDE (VAL1)", indexTableName2, viewFullName)); + + // directly insert a row into index + conn.createStatement().execute("UPSERT INTO " + indexTableFullName1 + " VALUES (4, 2, 8)"); + conn.createStatement().execute("UPSERT INTO " + indexTableFullName2 + " VALUES (8, 2, 4)"); + conn.commit(); + setIndexRowStatusesToVerified(conn, viewFullName, indexTableFullName1); + + IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, viewName, + indexTableName1, null, 0, IndexVerifyType.BEFORE, "-fi"); + assertExtraCounters(indexTool, 1, 0, true); + + indexTool = IndexToolIT.runIndexTool(false, false, schemaName, viewName, + indexTableName2, null, 0, IndexVerifyType.BEFORE, "-fi"); + assertExtraCounters(indexTool, 1, 0, true); + + String indexTablePhysicalName = "_IDX" + dataTableFullName; + byte[] indexTableFullNameBytes = Bytes.toBytes(indexTablePhysicalName); + IndexVerificationOutputRepository outputRepository = + new IndexVerificationOutputRepository(indexTableFullNameBytes, conn); + List rows = + outputRepository.getAllOutputRows(); + try { + assertEquals(2, rows.size()); + } catch (AssertionError e) { + TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); + throw e; + } + } + } + + @Test + public void testFromIndexToolForIncrementalVerify() throws Exception { + final int NROWS = 4; + ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge(); + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + long delta = 2; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + long t0 = EnvironmentEdgeManager.currentTimeMillis(); + customEdge.setValue(t0); + EnvironmentEdgeManager.injectEdge(customEdge); + + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions); + PreparedStatement dataPreparedStatement = + conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"); + for (int i = 1; i <= NROWS; i++) { + dataPreparedStatement.setInt(1, i); + dataPreparedStatement.setInt(2, i + 1); + dataPreparedStatement.setInt(3, i * 2); + dataPreparedStatement.execute(); + } + conn.commit(); + + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName)); + + customEdge.incrementValue(delta); + long t1 = customEdge.currentTime(); + + IndexTool it; + it = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.ONLY, + "-fi", "-st", String.valueOf(t0), "-et", String.valueOf(t1)); + + CounterGroup mrJobCounters; + mrJobCounters = IndexToolIT.getMRJobCounters(it); + assertEquals(NROWS, + mrJobCounters.findCounter(SCANNED_DATA_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + + // Add extra index rows + PreparedStatement indexPreparedStatement = + conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)"); + for (int i = NROWS + 1; i <= 2 * NROWS; i++) { + indexPreparedStatement.setInt(1, i + 1); // the indexed column + indexPreparedStatement.setInt(2, i); // the data pk column + indexPreparedStatement.setInt(3, i * 2); // the included column + indexPreparedStatement.execute(); + } + conn.commit(); + + // Set all index row statuses to verified so that read verify will not fix them. + // We want them to be fixed by IndexRepairRegionScanner + setIndexRowStatusesToVerified(conn, dataTableFullName, indexTableFullName); + customEdge.incrementValue(delta); + long t2 = customEdge.currentTime(); + it = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.ONLY, + "-fi", "-st", String.valueOf(t1), "-et", String.valueOf(t2)); + + // incremental verification should only scan NROWS instead of total 2*NROWS + mrJobCounters = IndexToolIT.getMRJobCounters(it); + assertEquals(NROWS, + mrJobCounters.findCounter(SCANNED_DATA_ROW_COUNT.name()).getValue()); + assertEquals(NROWS, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + + // now run another verification over the entire window [t0, t2] + it = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName, + indexTableName, null, 0, IndexVerifyType.ONLY, + "-fi", "-st", String.valueOf(t0), "-et", String.valueOf(t2)); + + mrJobCounters = IndexToolIT.getMRJobCounters(it); + assertEquals(2*NROWS, + mrJobCounters.findCounter(SCANNED_DATA_ROW_COUNT.name()).getValue()); + assertEquals(NROWS, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, + mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue()); + } finally { + EnvironmentEdgeManager.reset(); + } + } + + @Test + public void testDisableOutputLogging() throws Exception { + if (!mutable) { + return; + } + final int NROWS = 4; + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName(); + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName(); + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try(Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE " + dataTableFullName + + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions); + PreparedStatement dataPreparedStatement = + conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)"); + for (int i = 1; i <= NROWS; i++) { + dataPreparedStatement.setInt(1, i); + dataPreparedStatement.setInt(2, i + 1); + dataPreparedStatement.setInt(3, i * 2); + dataPreparedStatement.execute(); + } + conn.commit(); + conn.createStatement().execute(String.format( + "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName)); + + // Add extra index rows + PreparedStatement indexPreparedStatement = + conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)"); + for (int i = NROWS + 1; i <= 2 * NROWS; i++) { + indexPreparedStatement.setInt(1, i + 1); // the indexed column + indexPreparedStatement.setInt(2, i); // the data pk column + indexPreparedStatement.setInt(3, i * 2); // the included column + indexPreparedStatement.execute(); + } + conn.commit(); + + // Set all index row statuses to verified so that read verify will not fix them. + // We want them to be fixed by IndexRepairRegionScanner + setIndexRowStatusesToVerified(conn, dataTableFullName, indexTableFullName); + + // run the index MR job as ONLY so the index doesn't get rebuilt. Should be NROWS number + // of extra rows. We pass in --disable-logging BEFORE to silence the output logging to + // PHOENIX_INDEX_TOOL + assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.ONLY, + IndexTool.IndexDisableLoggingType.BEFORE, null, schemaName, dataTableName, indexTableName, + indexTableFullName, 0); + truncateIndexToolTables(); + + // logging to PHOENIX_INDEX_TOOL enabled + assertDisableLogging(conn, NROWS, IndexTool.IndexVerifyType.ONLY, + IndexTool.IndexDisableLoggingType.NONE, + IndexVerificationOutputRepository.PHASE_BEFORE_VALUE,schemaName, + dataTableName, indexTableName, + indexTableFullName, 0); + truncateIndexToolTables(); + + assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.BEFORE, + IndexTool.IndexDisableLoggingType.BEFORE, + null, schemaName, + dataTableName, indexTableName, + indexTableFullName, 0); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 41ae086e4ce..e6941b2306b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -26,10 +26,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -42,13 +39,14 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Job; import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; -import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow; import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper; +import org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters; import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; @@ -56,7 +54,6 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import org.apache.phoenix.transaction.TransactionFactory; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -86,15 +83,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT; @@ -636,6 +625,10 @@ public static void assertExplainPlan(boolean localIndex, String actualExplainPla actualExplainPlan.contains(expectedExplainPlan)); } + public static CounterGroup getMRJobCounters(IndexTool indexTool) throws IOException { + return indexTool.getJob().getCounters().getGroup(PhoenixIndexToolJobCounters.class.getName()); + } + private static List getArgList (boolean directApi, boolean useSnapshot, String schemaName, String dataTable, String indxTable, String tenantId, IndexTool.IndexVerifyType verifyType, Long startTime, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index 500533961c7..aebfc769736 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -89,6 +89,7 @@ import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID; import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING; import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_CELLS; +import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_ROW; import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW; import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.MISSING_ROW; import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS; @@ -111,6 +112,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { public static final String ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty"; public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack"; public static final String ERROR_MESSAGE_MISSING_INDEX_ROW = "Missing index row"; + public static final String ERROR_MESSAGE_EXTRA_INDEX_ROW = "Extra index row"; public static final String PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = "phoenix.index.mr.log.beyond.max.lookback.errors"; public static final boolean DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false; @@ -323,6 +325,7 @@ protected boolean isColumnIncluded(Cell cell) { byte[] qualifier = CellUtil.cloneQualifier(cell); return set.contains(qualifier); } + @VisibleForTesting public boolean shouldVerify(IndexTool.IndexVerifyType verifyType, byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer, @@ -374,6 +377,7 @@ private void closeTables() throws IOException { dataHTable.close(); } } + @Override public void close() throws IOException { innerScanner.close(); @@ -429,14 +433,19 @@ public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, lon null, isBeforeRebuild, errorType); } + protected byte[] getDataTableName() { + return region.getRegionInfo().getTable().getName(); + } + @VisibleForTesting public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs, String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild, IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException { ungroupedAggregateRegionObserver.checkForRegionClosing(); + byte[] dataTableName = getDataTableName(); verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(), - region.getRegionInfo().getTable().getName(), isBeforeRebuild, errorType); + dataTableName, isBeforeRebuild, errorType); } private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) { @@ -623,6 +632,37 @@ private void updateUnverifiedIndexRowCounters(Put actual, long expectedTs, List< } } + /** + * actualIndexMutationList is the list of all the mutations of a single extra index row (i.e. not referenced by data row) + * ordered by decreasing order of timestamps with Deletes before Puts + */ + private void logExtraIndexRowAndUpdateCounters(List actualIndexMutationList, + IndexToolVerificationResult.PhaseResult verificationPhaseResult, boolean isBeforeRebuild) throws IOException { + for (Mutation m : actualIndexMutationList) { + // this extra row in the index table has already been deleted + if ((m instanceof Delete)) { + return; + } + + // check the empty column status of latest (most recent) put mutation + if (isVerified((Put) m)) { + verificationPhaseResult.setExtraVerifiedIndexRowCount( + verificationPhaseResult.getExtraVerifiedIndexRowCount() + 1); + } else { + verificationPhaseResult.setExtraUnverifiedIndexRowCount( + verificationPhaseResult.getExtraUnverifiedIndexRowCount() + 1); + } + + byte[] indexKey = m.getRow(); + byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexKey), viewConstants); + String errorMsg = ERROR_MESSAGE_EXTRA_INDEX_ROW; + IndexVerificationOutputRepository.IndexVerificationErrorType errorType = EXTRA_ROW; + logToIndexToolOutputTable(dataKey, indexKey, 0, getTimestamp(m), errorMsg, + isBeforeRebuild, errorType); + break; + } + } + /** * In this method, the actual list is repaired in memory using the expected list which is actually the output of * rebuilding the index table row. The result of this repair is used only for verification. @@ -767,6 +807,7 @@ public boolean verifySingleIndexRow(byte[] indexRowKey, List actualMut if (actualMutationList == null || actualMutationList.isEmpty()) { throw new DoNotRetryIOException(ACTUAL_MUTATION_IS_NULL_OR_EMPTY); } + if (isBeforeRebuild) { Mutation m = actualMutationList.get(0); if (m instanceof Put && (mostRecentIndexRowKeys.isEmpty() || mostRecentIndexRowKeys.contains(m.getRow()))) { @@ -778,7 +819,9 @@ public boolean verifySingleIndexRow(byte[] indexRowKey, List actualMut if (verifyType == IndexTool.IndexVerifyType.ONLY) { repairActualMutationList(actualMutationList, expectedMutationList); } + // actualMutationList can be empty after returning from this function cleanUpActualMutationList(actualMutationList); + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); int actualIndex = 0; int expectedIndex = 0; @@ -883,6 +926,11 @@ public boolean verifySingleIndexRow(byte[] indexRowKey, List actualMut logMismatch(expected, actual, expectedIndex, verificationPhaseResult, isBeforeRebuild); } else { + if (expected == null) { + // Happens when the actualMutationList becomes empty after returning from + // the cleanUpActualMutationList function. + expected = expectedMutationList.get(0); + } byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRowKey), viewConstants); String errorMsg = String.format("Not matching index row. expectedIndex=%d. expectedMutationSize=%d. actualIndex=%d. actualMutationSize=%d. expectedType=%s. actualType=%s", expectedIndex, expectedSize, actualIndex, actualSize, expected.getClass().getName(), (actualIndex < actualSize ? actual.getClass().getName() : "null")); @@ -909,10 +957,11 @@ protected void verifyIndexRows(Map> actualIndexMutationMa if (expectedMutationList != null) { if (!verifySingleIndexRow(entry.getKey(), entry.getValue(), expectedMutationList, mostRecentIndexRowKeys, indexRowsToBeDeleted, verificationPhaseResult, isBeforeRebuild)) { - invalidIndexRows.put(indexRowKey, actualIndexMutationMap.get(indexRowKey)); + invalidIndexRows.put(indexRowKey, expectedMutationList); } expectedIndexMutationMap.remove(indexRowKey); } else { + logExtraIndexRowAndUpdateCounters(entry.getValue(), verificationPhaseResult, isBeforeRebuild); indexRowsToBeDeleted.add(indexMaintainer.buildRowDeleteMutation(indexRowKey, IndexMaintainer.DeleteType.ALL_VERSIONS, getTimestamp(entry.getValue().get(0)))); } @@ -965,6 +1014,7 @@ protected void verifyIndexRows(Map> actualIndexMutationMa logToIndexToolOutputTable(dataKey, indexKey, getTimestamp(mutation), 0, errorMsg, isBeforeRebuild, errorType); } + // Leave the invalid and missing rows in indexMutationMap expectedIndexMutationMap.putAll(invalidIndexRows); } @@ -1010,7 +1060,11 @@ protected void updateIndexRows(Map> indexMutationMap, verificationResult.setRebuiltIndexRowCount(verificationResult.getRebuiltIndexRowCount() + indexMutationMap.size()); } } catch (Throwable t) { - ServerUtil.throwIOException(indexHTable.getName().toString(), t); + if (indexHTable != null) { + ServerUtil.throwIOException(indexHTable.getName().toString(), t); + } else { + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java index 39295b3ac35..5e69925c0fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java @@ -91,6 +91,11 @@ public IndexRepairRegionScanner(final RegionScanner innerScanner, } } + @Override + public byte[] getDataTableName() { + return dataHTable.getName().toBytes(); + } + public void prepareExpectedIndexMutations(Result dataRow, Map> expectedIndexMutationMap) throws IOException { Put put = null; Delete del = null; @@ -178,6 +183,28 @@ private Map> populateActualIndexMutationMap(Map> populateActualIndexMutationMap() throws IOException { + Map> actualIndexMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + Scan indexScan = new Scan(); + indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); + indexScan.setRaw(true); + indexScan.setMaxVersions(); + indexScan.setCacheBlocks(false); + try (RegionScanner regionScanner = region.getScanner(indexScan)) { + do { + ungroupedAggregateRegionObserver.checkForRegionClosing(); + List row = new ArrayList(); + hasMore = regionScanner.nextRaw(row); + if (!row.isEmpty()) { + populateIndexMutationFromIndexRow(row, actualIndexMutationMap); + } + } while (hasMore); + } catch (Throwable t) { + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); + } + return actualIndexMutationMap; + } + private void repairAndOrVerifyIndexRows(Set dataRowKeys, Map> actualIndexMutationMap, IndexToolVerificationResult verificationResult) throws IOException { @@ -188,7 +215,7 @@ private void repairAndOrVerifyIndexRows(Set dataRowKeys, return; } if (verifyType == IndexTool.IndexVerifyType.ONLY) { - verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, Collections.EMPTY_LIST, verificationResult.getBefore(), true); + verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true); return; } if (verifyType == IndexTool.IndexVerifyType.BEFORE) { @@ -200,11 +227,12 @@ private void repairAndOrVerifyIndexRows(Set dataRowKeys, } if (verifyType == IndexTool.IndexVerifyType.AFTER) { repairIndexRows(expectedIndexMutationMap, Collections.EMPTY_LIST, verificationResult); - verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, Collections.EMPTY_LIST, verificationResult.getAfter(), false); + actualIndexMutationMap = populateActualIndexMutationMap(); + verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getAfter(), false); return; } if (verifyType == IndexTool.IndexVerifyType.BOTH) { - verifyIndexRows(actualIndexMutationMap,expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true); + verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true); if (!expectedIndexMutationMap.isEmpty() || !indexRowsToBeDeleted.isEmpty()) { repairIndexRows(expectedIndexMutationMap, indexRowsToBeDeleted, verificationResult); } @@ -215,6 +243,7 @@ private void repairAndOrVerifyIndexRows(Set dataRowKeys, } } + private void addRepairAndOrVerifyTask(TaskBatch tasks, final Set dataRowKeys, final Map> actualIndexMutationMap, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java index 2e022639e91..5919e8f7b50 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java @@ -30,6 +30,8 @@ import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES; @@ -41,6 +43,8 @@ import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES; +import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.REBUILT_INDEX_ROW_COUNT_BYTES; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY; import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.SCANNED_DATA_ROW_COUNT_BYTES; @@ -123,6 +127,8 @@ public static class PhaseResult { private long unverifiedIndexRowCount = 0; private long oldIndexRowCount = 0; private long unknownIndexRowCount = 0; + private long extraVerifiedIndexRowCount = 0; + private long extraUnverifiedIndexRowCount = 0; public void add(PhaseResult phaseResult) { setBeyondMaxLookBackMissingIndexRowCount(getBeyondMaxLookBackMissingIndexRowCount() + @@ -138,16 +144,21 @@ public void add(PhaseResult phaseResult) { setUnverifiedIndexRowCount(getUnverifiedIndexRowCount() + phaseResult.getUnverifiedIndexRowCount()); setUnknownIndexRowCount(getUnknownIndexRowCount() + phaseResult.getUnknownIndexRowCount()); setOldIndexRowCount(getOldIndexRowCount() + phaseResult.getOldIndexRowCount()); + setExtraVerifiedIndexRowCount(getExtraVerifiedIndexRowCount() + + phaseResult.getExtraVerifiedIndexRowCount()); + setExtraUnverifiedIndexRowCount(getExtraUnverifiedIndexRowCount() + + phaseResult.getExtraUnverifiedIndexRowCount()); } public PhaseResult() { } public PhaseResult(long validIndexRowCount, long expiredIndexRowCount, - long missingIndexRowCount, long invalidIndexRowCount, - long beyondMaxLookBackMissingIndexRowCount, - long beyondMaxLookBackInvalidIndexRowCount, - long indexHasExtraCellsCount, long indexHasMissingCellsCount) { + long missingIndexRowCount, long invalidIndexRowCount, + long beyondMaxLookBackMissingIndexRowCount, + long beyondMaxLookBackInvalidIndexRowCount, + long indexHasExtraCellsCount, long indexHasMissingCellsCount, + long extraVerifiedIndexRowCount, long extraUnverifiedIndexRowCount) { this.setValidIndexRowCount(validIndexRowCount); this.setExpiredIndexRowCount(expiredIndexRowCount); this.setMissingIndexRowCount(missingIndexRowCount); @@ -156,6 +167,8 @@ public PhaseResult(long validIndexRowCount, long expiredIndexRowCount, this.setBeyondMaxLookBackMissingIndexRowCount(beyondMaxLookBackMissingIndexRowCount); this.setIndexHasExtraCellsCount(indexHasExtraCellsCount); this.setIndexHasMissingCellsCount(indexHasMissingCellsCount); + this.setExtraVerifiedIndexRowCount(extraVerifiedIndexRowCount); + this.setExtraUnverifiedIndexRowCount(extraUnverifiedIndexRowCount); } @@ -172,6 +185,10 @@ public long getIndexHasMissingCellsCount() { return indexHasMissingCellsCount; } + public long getTotalExtraIndexRowsCount() { + return getExtraVerifiedIndexRowCount() + getExtraUnverifiedIndexRowCount() ; + } + @Override public String toString() { return "PhaseResult{" + @@ -184,6 +201,11 @@ public String toString() { ", beyondMaxLookBackInvalidIndexRowCount=" + getBeyondMaxLookBackInvalidIndexRowCount() + ", extraCellsOnIndexCount=" + indexHasExtraCellsCount + ", missingCellsOnIndexCount=" + indexHasMissingCellsCount + + ", unverifiedIndexRowCount=" + unverifiedIndexRowCount + + ", oldIndexRowCount=" + oldIndexRowCount + + ", unknownIndexRowCount=" + unknownIndexRowCount + + ", extraVerifiedIndexRowCount=" + extraVerifiedIndexRowCount + + ", extraUnverifiedIndexRowCount=" + extraUnverifiedIndexRowCount + '}'; } @@ -201,10 +223,14 @@ public boolean equals(Object o) { && this.validIndexRowCount == pr.validIndexRowCount && this.invalidIndexRowCount == pr.invalidIndexRowCount && this.missingIndexRowCount == pr.missingIndexRowCount - && this.getBeyondMaxLookBackInvalidIndexRowCount() == pr.getBeyondMaxLookBackInvalidIndexRowCount() - && this.getBeyondMaxLookBackMissingIndexRowCount() == pr.getBeyondMaxLookBackMissingIndexRowCount() + && this.beyondMaxLookBackInvalidIndexRowCount == pr.beyondMaxLookBackInvalidIndexRowCount + && this.beyondMaxLookBackMissingIndexRowCount== pr.beyondMaxLookBackMissingIndexRowCount && this.indexHasMissingCellsCount == pr.indexHasMissingCellsCount - && this.indexHasExtraCellsCount == pr.indexHasExtraCellsCount; + && this.indexHasExtraCellsCount == pr.indexHasExtraCellsCount + && this.oldIndexRowCount == pr.oldIndexRowCount + && this.unknownIndexRowCount == pr.unknownIndexRowCount + && this.extraVerifiedIndexRowCount == pr.extraVerifiedIndexRowCount + && this.extraUnverifiedIndexRowCount == pr.extraUnverifiedIndexRowCount; } @Override @@ -221,6 +247,8 @@ public int hashCode() { result = 31 * result + getUnverifiedIndexRowCount(); result = 31 * result + getOldIndexRowCount(); result = 31 * result + getUnknownIndexRowCount(); + result = 31 * result + getExtraVerifiedIndexRowCount(); + result = 31 * result + getExtraUnverifiedIndexRowCount(); return (int) result; } @@ -303,6 +331,18 @@ public long getUnknownIndexRowCount() { public void setUnknownIndexRowCount(long unknownIndexRowCount) { this.unknownIndexRowCount = unknownIndexRowCount; } + + public long getExtraVerifiedIndexRowCount() { return extraVerifiedIndexRowCount; } + + public void setExtraVerifiedIndexRowCount(long extraVerifiedIndexRowCount) { + this.extraVerifiedIndexRowCount = extraVerifiedIndexRowCount; + } + + public long getExtraUnverifiedIndexRowCount() { return extraUnverifiedIndexRowCount; } + + public void setExtraUnverifiedIndexRowCount(long extraUnverifiedIndexRowCount) { + this.extraUnverifiedIndexRowCount = extraUnverifiedIndexRowCount; + } } private long scannedDataRowCount = 0; @@ -372,6 +412,10 @@ public long getBeforeRebuildMissingIndexRowCount() { public long getBeforeIndexHasExtraCellsCount() {return getBefore().getIndexHasExtraCellsCount(); } + public long getBeforeRepairExtraVerifiedIndexRowCount() { return getBefore().getExtraVerifiedIndexRowCount(); } + + public long getBeforeRepairExtraUnverifiedIndexRowCount() { return getBefore().getExtraUnverifiedIndexRowCount(); } + public long getAfterRebuildValidIndexRowCount() { return getAfter().getValidIndexRowCount(); } @@ -400,6 +444,10 @@ public long getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount() { public long getAfterIndexHasExtraCellsCount() { return getAfter().getIndexHasExtraCellsCount(); } + public long getAfterRepairExtraVerifiedIndexRowCount() { return getAfter().getExtraVerifiedIndexRowCount(); } + + public long getAfterRepairExtraUnverifiedIndexRowCount() { return getAfter().getExtraUnverifiedIndexRowCount(); } + private void addScannedDataRowCount(long count) { this.setScannedDataRowCount(this.getScannedDataRowCount() + count); } @@ -451,6 +499,14 @@ public void addBeforeUnknownIndexRowCount(long count) { getBefore().setUnknownIndexRowCount(getBefore().getUnknownIndexRowCount() + count); } + public void addBeforeRepairExtraVerifiedIndexRowCount(long count) { + getBefore().setExtraVerifiedIndexRowCount(getBefore().getExtraVerifiedIndexRowCount() + count); + } + + public void addBeforeRepairExtraUnverifiedIndexRowCount(long count) { + getBefore().setExtraUnverifiedIndexRowCount(getBefore().getExtraUnverifiedIndexRowCount() + count); + } + private void addAfterRebuildValidIndexRowCount(long count) { getAfter().setValidIndexRowCount(getAfter().getValidIndexRowCount() + count); } @@ -483,6 +539,14 @@ public void addAfterIndexHasExtraCellsCount(long count) { getAfter().setIndexHasExtraCellsCount(getAfter().getIndexHasExtraCellsCount() + count); } + public void addAfterRepairExtraVerifiedIndexRowCount(long count) { + getAfter().setExtraVerifiedIndexRowCount(getAfter().getExtraVerifiedIndexRowCount() + count); + } + + public void addAfterRepairExtraUnverifiedIndexRowCount(long count) { + getAfter().setExtraUnverifiedIndexRowCount(getAfter().getExtraUnverifiedIndexRowCount() + count); + } + private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) { if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0, @@ -525,6 +589,10 @@ public void update(Cell cell) { addBeforeOldIndexRowCount(getValue(cell)); } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT_BYTES)) { addBeforeUnknownIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES)) { + addBeforeRepairExtraVerifiedIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES)) { + addBeforeRepairExtraUnverifiedIndexRowCount(getValue(cell)); } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) { addAfterRebuildValidIndexRowCount(getValue(cell)); } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) { @@ -541,13 +609,18 @@ public void update(Cell cell) { addAfterIndexHasExtraCellsCount(getValue(cell)); } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES)) { addAfterIndexHasMissingCellsCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES)) { + addAfterRepairExtraVerifiedIndexRowCount(getValue(cell)); + } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES)) { + addAfterRepairExtraUnverifiedIndexRowCount(getValue(cell)); } } public boolean isVerificationFailed() { //we don't want to count max look back failures alone as failing an index rebuild job //so we omit them from the below calculation. - if (getAfter().getInvalidIndexRowCount() + getAfter().getMissingIndexRowCount() > 0) { + if (getAfter().getInvalidIndexRowCount() + getAfter().getMissingIndexRowCount() + + getAfter().getExtraVerifiedIndexRowCount() > 0) { return true; } return false; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java index 549f876980a..a0743dd084c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java @@ -92,6 +92,7 @@ public class IndexVerificationOutputRepository implements AutoCloseable { public enum IndexVerificationErrorType { INVALID_ROW, MISSING_ROW, + EXTRA_ROW, EXTRA_CELLS, BEYOND_MAX_LOOKBACK_INVALID, BEYOND_MAX_LOOKBACK_MISSING, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java index bf9b4bf81e1..89599e66314 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.compat.hbase.CompatUtil; import org.apache.phoenix.coprocessor.IndexToolVerificationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.hbase.index.table.HTableFactory; @@ -122,6 +121,20 @@ public class IndexVerificationResultRepository implements AutoCloseable { public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS = "AfterRebuildInvalidIndexRowCountCozMissingCells"; public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS); + public final static String BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT = "BeforeRepairExtraVerifiedIndexRowCount"; + public final static byte[] BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES = + Bytes.toBytes(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT); + public final static String BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT = "BeforeRepairExtraUnverifiedIndexRowCount"; + public final static byte[] BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES = + Bytes.toBytes(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT); + + public final static String AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT = "AfterRepairExtraVerifiedIndexRowCount"; + public final static byte[] AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES = + Bytes.toBytes(AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT); + public final static String AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT = "AfterRepairExtraUnverifiedIndexRowCount"; + public final static byte[] AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES = + Bytes.toBytes(AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT); + /*** * Only usable for read / create methods. To write use setResultTable and setIndexTable first */ @@ -241,6 +254,10 @@ public void logToIndexToolResultTable(IndexToolVerificationResult verificationRe Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildOldIndexRowCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT_BYTES, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildUnknownIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES, + Bytes.toBytes(Long.toString(verificationResult.getBeforeRepairExtraVerifiedIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES, + Bytes.toBytes(Long.toString(verificationResult.getBeforeRepairExtraUnverifiedIndexRowCount()))); } if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) { put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES, @@ -259,6 +276,10 @@ public void logToIndexToolResultTable(IndexToolVerificationResult verificationRe Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasExtraCellsCount()))); put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES, Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasMissingCellsCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES, + Bytes.toBytes(Long.toString(verificationResult.getAfterRepairExtraVerifiedIndexRowCount()))); + put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES, + Bytes.toBytes(Long.toString(verificationResult.getAfterRepairExtraUnverifiedIndexRowCount()))); } resultTable.put(put); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java index 8cee869e345..953e61f3af5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java @@ -94,6 +94,10 @@ private void updateCounters(IndexTool.IndexVerifyType verifyType, setValue(verificationResult.getBeforeRebuildOldIndexRowCount()); context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT). setValue(verificationResult.getBeforeRebuildUnknownIndexRowCount()); + context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT). + setValue(verificationResult.getBeforeRepairExtraVerifiedIndexRowCount()); + context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT). + setValue(verificationResult.getBeforeRepairExtraUnverifiedIndexRowCount()); } if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) { context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT). @@ -112,6 +116,10 @@ private void updateCounters(IndexTool.IndexVerifyType verifyType, setValue(verificationResult.getAfterIndexHasExtraCellsCount()); context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS). setValue(verificationResult.getAfterIndexHasMissingCellsCount()); + context.getCounter(PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT). + setValue(verificationResult.getAfterRepairExtraVerifiedIndexRowCount()); + context.getCounter(PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT). + setValue(verificationResult.getAfterRepairExtraUnverifiedIndexRowCount()); } if (verificationResult.isVerificationFailed()) { throw new IOException("Index verification failed! " + verificationResult); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java index c6c6ec79b71..8016bce87f6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java @@ -35,6 +35,8 @@ public enum PhoenixIndexToolJobCounters { BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT, BEFORE_REBUILD_OLD_INDEX_ROW_COUNT, BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT, + BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT, + BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT, AFTER_REBUILD_VALID_INDEX_ROW_COUNT, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT, @@ -42,5 +44,7 @@ public enum PhoenixIndexToolJobCounters { AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS, - AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS + AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS, + AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT, + AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java index b7eefc643fe..bf477d2215c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java @@ -357,7 +357,7 @@ public void testVerifySingleIndexRow_allUnverified() throws IOException { @Test public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException { IndexToolVerificationResult.PhaseResult - expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0, 0, 0,0,0); + expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0, 0, 0, 0, 0, 0, 0); try { for (Map.Entry> entry : indexKeyToMutationMap.entrySet()) { @@ -510,7 +510,7 @@ public void testVerifySingleIndexRow_compactionOnIndexTable_atLeastOneExpectedMu assertTrue(rebuildScanner.verifySingleIndexRow(indexRow.getRow(), actualMutations, indexKeyToMutationMap.get(indexRow.getRow()), mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, false)); // validIndexRowCount = 1 - IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0); + IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0, 0, 0); assertTrue(actualPR.equals(expectedPR)); } @@ -566,7 +566,7 @@ public void testVerifySingleIndexRow_compactionOnIndexTable_noExpectedMutationWi // Report this validation as a failure assertFalse(rebuildScanner.verifySingleIndexRow(indexRow.getRow(), actualMutations, expectedMutations, mostRecentIndexRowKeys, new ArrayList(), actualPR, true)); // beyondMaxLookBackInvalidIndexRowCount = 1 - IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(0, 0, 0, 0, 0, 1, 0, 0); + IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(0, 0, 0, 0, 0, 1, 0, 0, 0, 0); assertTrue(actualPR.equals(expectedPR)); } @@ -580,11 +580,11 @@ private static byte[] generateIndexRowKey(String dataRowKey, String dataVal){ } private IndexToolVerificationResult.PhaseResult getValidPhaseResult() { - return new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0); + return new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0, 0, 0); } private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() { - return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1, 0, 0, 0, 0); + return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1, 0, 0, 0, 0, 0, 0); } private void initializeLocalMockitoSetup(Map.Entry> entry,