diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index 58613239759..6a4f910f865 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -34,14 +34,20 @@ import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.end2end.IndexToolIT; import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.junit.After; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -54,9 +60,10 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { private static final Log LOG = LogFactory.getLog(GlobalIndexCheckerIT.class); private final boolean async; private final String tableDDLOptions; - + private final boolean encoded; public GlobalIndexCheckerIT(boolean async, boolean encoded) { this.async = async; + this.encoded = encoded; StringBuilder optionBuilder = new StringBuilder(); if (!encoded) { optionBuilder.append(" COLUMN_ENCODED_BYTES=0 "); @@ -305,17 +312,8 @@ public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception { public void testOnePhaseOverwrite() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { String dataTableName = generateUniqueName(); - populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') String indexTableName = generateUniqueName(); - conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); - conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " + - dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")); - if (async) { - // run the index MR job. - IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1"); - IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2"); - } + createTableAndIndexes(conn, dataTableName, indexTableName); // Configure IndexRegionObserver to skip the last two write phase (i.e., the data table update and post index // update phase) and check that this does not impact the correctness (one overwrite) IndexRegionObserver.setFailDataTableUpdatesForTesting(true); @@ -385,21 +383,26 @@ public void testOnePhaseOverwrite() throws Exception { } } + private void createTableAndIndexes(Connection conn, String dataTableName, String indexTableName) throws Exception { + populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " + + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " + + dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")); + if (async) { + // run the index MR job. + IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1"); + IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2"); + } + } + @Test public void testFailDataTableAndPostIndexRowUpdate() throws Exception { - String dataTableName = generateUniqueName(); - populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); String indexName = generateUniqueName(); - conn.createStatement().execute("CREATE INDEX " + indexName + "1 on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); - conn.createStatement().execute("CREATE INDEX " + indexName + "2 on " + - dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")); - if (async) { - // run the index MR job. - IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "1"); - IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "2"); - } + createTableAndIndexes(conn, dataTableName, indexName); // Configure IndexRegionObserver to fail the last two write phase (i.e., the data table update and post index update phase) // and check that this does not impact the correctness IndexRegionObserver.setFailDataTableUpdatesForTesting(true); @@ -490,6 +493,68 @@ public void testViewIndexRowUpdate() throws Exception { } } + @Test(timeout=60000L) + public void testRecentlyDeletedRowsNotCompactedAway() throws Exception { + //don't need to do this test multiple times, so only async + encoded = false + if (async || encoded){ + return; + } + Configuration conf = getUtility().getConfiguration(); + long oldLookbackAge = conf.getLong(GlobalIndexChecker.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + GlobalIndexChecker.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conf.setLong(GlobalIndexChecker.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 3L); + String dataTableName = generateUniqueName(); + String indexStem = generateUniqueName(); + createTableAndIndexes(conn, dataTableName, indexStem); + String fullIndexName = indexStem + "1"; + long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis(); + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE " + + " id = 'a'"); + conn.commit(); + long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTimeMillis(); + TableName indexTable = TableName.valueOf(fullIndexName); + //select stmt to get row we deleted + String sql = "SELECT * FROM " + dataTableName + " WHERE val1 = 'ab'"; + assertExplainPlan(conn, sql, dataTableName, fullIndexName); + assertRowExistsAtSCN(sql, beforeDeleteSCN, true); + majorCompact(indexTable, beforeFirstCompactSCN); + assertRowExistsAtSCN(sql, beforeDeleteSCN, true); + Thread.sleep(3000); + long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis(); + assertRowExistsAtSCN(sql, beforeSecondCompactSCN, false); + } finally{ + conf.setLong(GlobalIndexChecker.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, oldLookbackAge); + } + } + + private void assertRowExistsAtSCN(String sql, long scn, boolean shouldExist) + throws Exception { + boolean rowExists = false; + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn)); + try (Connection conn = DriverManager.getConnection(getUrl(), props)){ + ResultSet rs = conn.createStatement().executeQuery(sql); + rowExists = rs.next(); + } + if (shouldExist){ + Assert.assertTrue("Row was not found at time " + scn + + " when it should have been ", + rowExists); + } else { + Assert.assertFalse("Row was found at time " + scn + + " when it should not have been", rowExists); + } + } + + private void majorCompact(TableName table, long afterDeleteSCN) throws Exception { + Admin admin = getUtility().getHBaseAdmin(); + admin.majorCompact(table); + while (admin.getLastMajorCompactionTimestamp(table) > afterDeleteSCN){ + Thread.sleep(100); + } + } + static private void commitWithException(Connection conn) { try { conn.commit(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index c7f79aed753..d5f1c28b61f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -26,17 +26,21 @@ import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Mutation; @@ -48,9 +52,15 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -91,6 +101,9 @@ */ public class GlobalIndexChecker extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(GlobalIndexChecker.class); + public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY = + "phoenix.max.lookback.age.seconds"; + public static final long DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 60 * 60; private HTableFactory hTableFactory; private GlobalIndexCheckerSource metricsSource; public enum RebuildReturnCode { @@ -534,4 +547,119 @@ public void start(CoprocessorEnvironment e) throws IOException { public void stop(CoprocessorEnvironment e) throws IOException { this.hTableFactory.shutdown(); } + + @Override + public InternalScanner preFlushScannerOpen(final ObserverContext c, + final Store store, + final KeyValueScanner memstoreScanner, + final InternalScanner s) + throws IOException { + + if (!isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){ + return s; + } + + //close last scanner object before creating a new one + if(s != null) { + s.close(); + } + + // Called during flushing the memstore to disk. + // Need to retain all the delete markers & all the versions + Scan scan = new Scan(); + ScanInfo oldScanInfo = store.getScanInfo(); + + Configuration conf = c.getEnvironment().getConfiguration(); + ScanInfo scanInfo = + getScanInfo(conf, oldScanInfo, store); + if (LOG.isDebugEnabled()) { + LOG.debug("Creating the store scanner with :" + scanInfo + ", " + + "scan object:" + scan + " for table " + store.getTableName().getNameAsString() + + " and region " + store.getRegionInfo().getRegionNameAsString() + + " and cf " + store.getColumnFamilyName()); + } + return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), + ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), + HConstants.LATEST_TIMESTAMP); + } + + @Override + public InternalScanner preCompactScannerOpen( + final ObserverContext c, final Store store, + List scanners, final ScanType scanType, final long earliestPutTs, + final InternalScanner s) throws IOException { + + if (!isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){ + return s; + } + //close last scanner object before creating a new one + if(s != null) { + s.close(); + } + Scan scan = new Scan(); + ScanInfo oldScanInfo = store.getScanInfo(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Compaction triggering for table:" + + store.getRegionInfo().getTable().toString() + + " with scanType " + scanType + " for table " + + store.getTableName().getNameAsString() + " and region " + + store.getRegionInfo().getRegionNameAsString() + + " and cf " + store.getColumnFamilyName()); + } + + Configuration conf = c.getEnvironment().getConfiguration(); + ScanInfo scanInfo = + getScanInfo(conf, oldScanInfo, store); + return new StoreScanner(store, scanInfo, scan, scanners, scanType, + store.getSmallestReadPoint(), + earliestPutTs); + } + + /* + * If KeepDeletedCells.FALSE, KeepDeletedCells.TTL , + * let delete markers age once lookback age is done. + */ + private KeepDeletedCells getKeepDeletedCells(final Store store) { + return (store.getFamily().getKeepDeletedCells() == KeepDeletedCells.TRUE) ? + KeepDeletedCells.TRUE : KeepDeletedCells.TTL; + } + + private ScanInfo getScanInfo(Configuration conf,ScanInfo oldScanInfo, + final Store store) { + long ttl = getTimeToLive(store.getFamily(), oldScanInfo); + KeepDeletedCells keepDeletedCells = getKeepDeletedCells(store); + return new ScanInfo(conf,store.getFamily().getName(), oldScanInfo.getMinVersions(), + Integer.MAX_VALUE, ttl, keepDeletedCells, + oldScanInfo.getTimeToPurgeDeletes(), + oldScanInfo.getComparator()); + } + + private long getTimeToLive(HColumnDescriptor columnDescriptor, ScanInfo scanInfo) { + long ttl = scanInfo.getTtl(); + long maxLookbackTime = getMaxLookbackTime(scanInfo.getConfiguration()); + + // If user configured default TTL(FOREVER) and keep deleted cells to false or + // TTL then to remove unwanted delete markers we should change ttl to max lookback age + if (scanInfo.getTtl() == Long.MAX_VALUE + && columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) { + ttl = maxLookbackTime; + } else { + ttl = Math.max(ttl, maxLookbackTime); + } + return ttl; + } + + private long getMaxLookbackTime(Configuration conf){ + long now = EnvironmentEdgeManager.currentTimeMillis(); + long maxAge = conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + DEFAULT_PHOENIX_MAX_LOOKBACK_AGE); + return now - maxAge; + } + + private boolean isMaxLookbackTimeEnabled(Configuration conf){ + return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) > 0L; + } + }