Skip to content

Commit

Permalink
PHOENIX-5645 - GlobalIndexChecker should prevent compaction from purg…
Browse files Browse the repository at this point in the history
…ing very recently deleted cells
  • Loading branch information
gjacoby126 committed Dec 20, 2019
1 parent 782effc commit 29bc10e
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 22 deletions.
Expand Up @@ -34,14 +34,20 @@
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -54,9 +60,10 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
private static final Log LOG = LogFactory.getLog(GlobalIndexCheckerIT.class);
private final boolean async;
private final String tableDDLOptions;

private final boolean encoded;
public GlobalIndexCheckerIT(boolean async, boolean encoded) {
this.async = async;
this.encoded = encoded;
StringBuilder optionBuilder = new StringBuilder();
if (!encoded) {
optionBuilder.append(" COLUMN_ENCODED_BYTES=0 ");
Expand Down Expand Up @@ -305,17 +312,8 @@ public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
public void testOnePhaseOverwrite() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
String indexTableName = generateUniqueName();
conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
if (async) {
// run the index MR job.
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
}
createTableAndIndexes(conn, dataTableName, indexTableName);
// Configure IndexRegionObserver to skip the last two write phase (i.e., the data table update and post index
// update phase) and check that this does not impact the correctness (one overwrite)
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
Expand Down Expand Up @@ -385,21 +383,26 @@ public void testOnePhaseOverwrite() throws Exception {
}
}

private void createTableAndIndexes(Connection conn, String dataTableName, String indexTableName) throws Exception {
populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
if (async) {
// run the index MR job.
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
}
}

@Test
public void testFailDataTableAndPostIndexRowUpdate() throws Exception {
String dataTableName = generateUniqueName();
populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')

try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
String indexName = generateUniqueName();
conn.createStatement().execute("CREATE INDEX " + indexName + "1 on " +
dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
conn.createStatement().execute("CREATE INDEX " + indexName + "2 on " +
dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
if (async) {
// run the index MR job.
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "1");
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "2");
}
createTableAndIndexes(conn, dataTableName, indexName);
// Configure IndexRegionObserver to fail the last two write phase (i.e., the data table update and post index update phase)
// and check that this does not impact the correctness
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
Expand Down Expand Up @@ -490,6 +493,68 @@ public void testViewIndexRowUpdate() throws Exception {
}
}

@Test(timeout=60000L)
public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
//don't need to do this test multiple times, so only async + encoded = false
if (async || encoded){
return;
}
Configuration conf = getUtility().getConfiguration();
long oldLookbackAge = conf.getLong(GlobalIndexChecker.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
GlobalIndexChecker.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conf.setLong(GlobalIndexChecker.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, 3L);
String dataTableName = generateUniqueName();
String indexStem = generateUniqueName();
createTableAndIndexes(conn, dataTableName, indexStem);
String fullIndexName = indexStem + "1";
long beforeDeleteSCN = EnvironmentEdgeManager.currentTimeMillis();
conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE " +
" id = 'a'");
conn.commit();
long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
TableName indexTable = TableName.valueOf(fullIndexName);
//select stmt to get row we deleted
String sql = "SELECT * FROM " + dataTableName + " WHERE val1 = 'ab'";
assertExplainPlan(conn, sql, dataTableName, fullIndexName);
assertRowExistsAtSCN(sql, beforeDeleteSCN, true);
majorCompact(indexTable, beforeFirstCompactSCN);
assertRowExistsAtSCN(sql, beforeDeleteSCN, true);
Thread.sleep(3000);
long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis();
assertRowExistsAtSCN(sql, beforeSecondCompactSCN, false);
} finally{
conf.setLong(GlobalIndexChecker.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, oldLookbackAge);
}
}

private void assertRowExistsAtSCN(String sql, long scn, boolean shouldExist)
throws Exception {
boolean rowExists = false;
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
try (Connection conn = DriverManager.getConnection(getUrl(), props)){
ResultSet rs = conn.createStatement().executeQuery(sql);
rowExists = rs.next();
}
if (shouldExist){
Assert.assertTrue("Row was not found at time " + scn +
" when it should have been ",
rowExists);
} else {
Assert.assertFalse("Row was found at time " + scn +
" when it should not have been", rowExists);
}
}

private void majorCompact(TableName table, long afterDeleteSCN) throws Exception {
Admin admin = getUtility().getHBaseAdmin();
admin.majorCompact(table);
while (admin.getLastMajorCompactionTimestamp(table) > afterDeleteSCN){
Thread.sleep(100);
}
}

static private void commitWithException(Connection conn) {
try {
conn.commit();
Expand Down
Expand Up @@ -26,17 +26,21 @@
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
Expand All @@ -48,9 +52,15 @@
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
Expand Down Expand Up @@ -91,6 +101,9 @@
*/
public class GlobalIndexChecker extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(GlobalIndexChecker.class);
public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
"phoenix.max.lookback.age.seconds";
public static final long DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 60 * 60;
private HTableFactory hTableFactory;
private GlobalIndexCheckerSource metricsSource;
public enum RebuildReturnCode {
Expand Down Expand Up @@ -534,4 +547,119 @@ public void start(CoprocessorEnvironment e) throws IOException {
public void stop(CoprocessorEnvironment e) throws IOException {
this.hTableFactory.shutdown();
}

@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store,
final KeyValueScanner memstoreScanner,
final InternalScanner s)
throws IOException {

if (!isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){
return s;
}

//close last scanner object before creating a new one
if(s != null) {
s.close();
}

// Called during flushing the memstore to disk.
// Need to retain all the delete markers & all the versions
Scan scan = new Scan();
ScanInfo oldScanInfo = store.getScanInfo();

Configuration conf = c.getEnvironment().getConfiguration();
ScanInfo scanInfo =
getScanInfo(conf, oldScanInfo, store);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating the store scanner with :" + scanInfo + ", " +
"scan object:" + scan + " for table " + store.getTableName().getNameAsString() +
" and region " + store.getRegionInfo().getRegionNameAsString() +
" and cf " + store.getColumnFamilyName());
}
return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.LATEST_TIMESTAMP);
}

@Override
public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
final InternalScanner s) throws IOException {

if (!isMaxLookbackTimeEnabled(c.getEnvironment().getConfiguration())){
return s;
}
//close last scanner object before creating a new one
if(s != null) {
s.close();
}
Scan scan = new Scan();
ScanInfo oldScanInfo = store.getScanInfo();

if (LOG.isDebugEnabled()) {
LOG.debug("Compaction triggering for table:" +
store.getRegionInfo().getTable().toString()
+ " with scanType " + scanType + " for table " +
store.getTableName().getNameAsString() + " and region " +
store.getRegionInfo().getRegionNameAsString() +
" and cf " + store.getColumnFamilyName());
}

Configuration conf = c.getEnvironment().getConfiguration();
ScanInfo scanInfo =
getScanInfo(conf, oldScanInfo, store);
return new StoreScanner(store, scanInfo, scan, scanners, scanType,
store.getSmallestReadPoint(),
earliestPutTs);
}

/*
* If KeepDeletedCells.FALSE, KeepDeletedCells.TTL ,
* let delete markers age once lookback age is done.
*/
private KeepDeletedCells getKeepDeletedCells(final Store store) {
return (store.getFamily().getKeepDeletedCells() == KeepDeletedCells.TRUE) ?
KeepDeletedCells.TRUE : KeepDeletedCells.TTL;
}

private ScanInfo getScanInfo(Configuration conf,ScanInfo oldScanInfo,
final Store store) {
long ttl = getTimeToLive(store.getFamily(), oldScanInfo);
KeepDeletedCells keepDeletedCells = getKeepDeletedCells(store);
return new ScanInfo(conf,store.getFamily().getName(), oldScanInfo.getMinVersions(),
Integer.MAX_VALUE, ttl, keepDeletedCells,
oldScanInfo.getTimeToPurgeDeletes(),
oldScanInfo.getComparator());
}

private long getTimeToLive(HColumnDescriptor columnDescriptor, ScanInfo scanInfo) {
long ttl = scanInfo.getTtl();
long maxLookbackTime = getMaxLookbackTime(scanInfo.getConfiguration());

// If user configured default TTL(FOREVER) and keep deleted cells to false or
// TTL then to remove unwanted delete markers we should change ttl to max lookback age
if (scanInfo.getTtl() == Long.MAX_VALUE
&& columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) {
ttl = maxLookbackTime;
} else {
ttl = Math.max(ttl, maxLookbackTime);
}
return ttl;
}

private long getMaxLookbackTime(Configuration conf){
long now = EnvironmentEdgeManager.currentTimeMillis();
long maxAge = conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
DEFAULT_PHOENIX_MAX_LOOKBACK_AGE);
return now - maxAge;
}

private boolean isMaxLookbackTimeEnabled(Configuration conf){
return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) > 0L;
}

}

0 comments on commit 29bc10e

Please sign in to comment.