Skip to content

Commit

Permalink
PHOENIX-5645 - GlobalIndexChecker should prevent compaction from purg… (
Browse files Browse the repository at this point in the history
#662)

PHOENIX-5645 - GlobalIndexChecker should prevent compaction from purging very recently deleted cells
  • Loading branch information
gjacoby126 committed Jan 13, 2020
1 parent 9ab5ea8 commit a37980c
Show file tree
Hide file tree
Showing 11 changed files with 840 additions and 60 deletions.
350 changes: 350 additions & 0 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackIT.java

Large diffs are not rendered by default.

72 changes: 59 additions & 13 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/SCNIT.java
Expand Up @@ -24,11 +24,17 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Assert;
import org.junit.Test;

public class SCNIT extends ParallelStatsDisabledIT {
Expand Down Expand Up @@ -67,7 +73,7 @@ public void testReadBeforeDelete() throws Exception {
rs.close();
}
props.clear();
props.setProperty("CurrentSCN", Long.toString(timeAfterDelete));
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(timeAfterDelete));
try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
assertTrue(rs.next());
Expand All @@ -82,28 +88,68 @@ public void testReadBeforeDelete() throws Exception {

@Test
public void testSCNWithTTL() throws Exception {
int ttl = 2;
String fullTableName = createTableWithTTL(ttl);
//sleep for one second longer than ttl
Thread.sleep(ttl * 1000 + 1000);
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
assertFalse(rs.next());
rs.close();
}
}

@Test
public void testTooLowSCNWithTTL() throws Exception {
//if scn is for an older time than a table's ttl, it should throw a SQLException
int ttl = 2;
String fullTableName = createTableWithTTL(ttl);
int sleepTime = (ttl + 1)* 1000;
//need to sleep long enough for the SCN to still find the syscat row for the table
Thread.sleep(sleepTime);
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(EnvironmentEdgeManager.currentTime() - sleepTime));
try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
connscn.createStatement().
executeQuery(String.format("select * from %s", fullTableName));
} catch (SQLException se){
SQLExceptionCode code = SQLExceptionCode.CANNOT_QUERY_TABLE_WITH_SCN_OLDER_THAN_TTL;
assertSqlExceptionCode(code, se);
return;
}
Assert.fail("We should have thrown an exception for the too-early SCN");
}

private void assertSqlExceptionCode(SQLExceptionCode code, SQLException se) {
assertEquals(code.getErrorCode(), se.getErrorCode());
assertTrue("Wrong error message", se.getMessage().contains(code.getMessage()));
assertEquals(code.getSQLState(), se.getSQLState());
}

private String createTableWithTTL(int ttl) throws SQLException, InterruptedException {
String schemaName = generateUniqueName();
String tableName = generateUniqueName();
StringBuilder optionsBuilder = new StringBuilder();
if (ttl > 0){
optionsBuilder.append("TTL=");
optionsBuilder.append(ttl);
}
String ddlOptions = optionsBuilder.toString();
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement()
.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) TTL=2");
.execute(String.format("CREATE TABLE %s" +
"(k VARCHAR PRIMARY KEY, v VARCHAR) %s", fullTableName, ddlOptions));
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','aa')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('c','cc')");
conn.commit();
// TTL is 2 sec
Thread.sleep(3000);
}

Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(EnvironmentEdgeManager.currentTime() - 1000));
try (Connection connscn = DriverManager.getConnection(getUrl(), props)) {
ResultSet rs = connscn.createStatement().executeQuery("select * from " + fullTableName);
assertFalse(rs.next());
rs.close();
}
return fullTableName;
}

}
Expand Up @@ -42,6 +42,7 @@
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -53,15 +54,12 @@
public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
private static final Log LOG = LogFactory.getLog(GlobalIndexCheckerIT.class);
private final boolean async;
private final String tableDDLOptions;

private String tableDDLOptions;
private StringBuilder optionBuilder;
private final boolean encoded;
public GlobalIndexCheckerIT(boolean async, boolean encoded) {
this.async = async;
StringBuilder optionBuilder = new StringBuilder();
if (!encoded) {
optionBuilder.append(" COLUMN_ENCODED_BYTES=0 ");
}
this.tableDDLOptions = optionBuilder.toString();
this.encoded = encoded;
}

@BeforeClass
Expand All @@ -71,6 +69,15 @@ public static synchronized void doSetup() throws Exception {
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}

@Before
public void beforeTest(){
optionBuilder = new StringBuilder();
if (!encoded) {
optionBuilder.append(" COLUMN_ENCODED_BYTES=0");
}
this.tableDDLOptions = optionBuilder.toString();
}

@Parameters(
name = "async={0},encoded={1}")
public static synchronized Collection<Object[]> data() {
Expand Down Expand Up @@ -305,17 +312,8 @@ public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
public void testOnePhaseOverwrite() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
String indexTableName = generateUniqueName();
conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
if (async) {
// run the index MR job.
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
}
createTableAndIndexes(conn, dataTableName, indexTableName);
// Configure IndexRegionObserver to skip the last two write phase (i.e., the data table update and post index
// update phase) and check that this does not impact the correctness (one overwrite)
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
Expand Down Expand Up @@ -385,21 +383,35 @@ public void testOnePhaseOverwrite() throws Exception {
}
}

private void createTableAndIndexes(Connection conn, String dataTableName,
String indexTableName) throws Exception {
createTableAndIndexes(conn, dataTableName, indexTableName, 1);
}

private void createTableAndIndexes(Connection conn, String dataTableName,
String indexTableName, int indexVersions) throws Exception {
populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')
conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " +
dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") +
" VERSIONS=" + indexVersions);
conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " +
dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")+
" VERSIONS=" + indexVersions);
conn.commit();
if (async) {
// run the index MR job.
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1");
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "2");
}
}

@Test
public void testFailDataTableAndPostIndexRowUpdate() throws Exception {
String dataTableName = generateUniqueName();
populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde')

try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
String indexName = generateUniqueName();
conn.createStatement().execute("CREATE INDEX " + indexName + "1 on " +
dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
conn.createStatement().execute("CREATE INDEX " + indexName + "2 on " +
dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
if (async) {
// run the index MR job.
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "1");
IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "2");
}
createTableAndIndexes(conn, dataTableName, indexName);
// Configure IndexRegionObserver to fail the last two write phase (i.e., the data table update and post index update phase)
// and check that this does not impact the correctness
IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
Expand Down
Expand Up @@ -20,28 +20,108 @@
import java.io.IOException;
import java.util.NavigableSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.client.Scan;

public class ScanInfoUtil {
public static final String PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY =
"phoenix.max.lookback.age.seconds";
public static final int DEFAULT_PHOENIX_MAX_LOOKBACK_AGE = 0;

private ScanInfoUtil() {
}

public static boolean isKeepDeletedCells(ScanInfo scanInfo) {
return scanInfo.getKeepDeletedCells() != KeepDeletedCells.FALSE;
}

public static ScanInfo cloneScanInfoWithKeepDeletedCells(ScanInfo scanInfo) {
return new ScanInfo(scanInfo.getConfiguration(), scanInfo.getFamily(), scanInfo.getMinVersions(),
scanInfo.getMaxVersions(), scanInfo.getTtl(), KeepDeletedCells.TRUE,
scanInfo.getTimeToPurgeDeletes(), scanInfo.getComparator());
}

public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns,long readPt) throws IOException {
public static StoreScanner createStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
final NavigableSet<byte[]> columns,long readPt)
throws IOException {
if(!scan.isReversed()) {
return new StoreScanner(store, scanInfo, scan, columns,readPt);
} else {
return new ReversedStoreScanner(store, scanInfo, scan, columns,readPt);
}
}

public static long getTimeToLiveForCompactions(HColumnDescriptor columnDescriptor,
ScanInfo scanInfo) {
long ttl = scanInfo.getTtl();
long maxLookbackTtl = getMaxLookback(scanInfo.getConfiguration());
if (isMaxLookbackTimeEnabled(maxLookbackTtl)) {
if (ttl == Long.MAX_VALUE
&& columnDescriptor.getKeepDeletedCells() != KeepDeletedCells.TRUE) {
// If user configured default TTL(FOREVER) and keep deleted cells to false or
// TTL then to remove unwanted delete markers we should change ttl to max lookback age
ttl = maxLookbackTtl;
} else {
//if there is a TTL, use TTL instead of max lookback age.
// Max lookback age should be more recent or equal to TTL
ttl = Math.max(ttl, maxLookbackTtl);
}
}

return ttl;
}

/*
* If KeepDeletedCells.FALSE, KeepDeletedCells.TTL ,
* let delete markers age once lookback age is done.
*/
private static KeepDeletedCells getKeepDeletedCells(final Store store, ScanType scanType) {
//if we're doing a minor compaction or flush, always set keep deleted cells
//to true. Otherwise, if keep deleted cells is false or TTL, use KeepDeletedCells TTL,
//where the value of the ttl might be overriden to the max lookback age elsewhere
return (store.getFamily().getKeepDeletedCells() == KeepDeletedCells.TRUE
|| scanType.equals(ScanType.COMPACT_RETAIN_DELETES)) ?
KeepDeletedCells.TRUE : KeepDeletedCells.TTL;
}

/*
* if the user set a TTL we should leave MIN_VERSIONS at the default (0 in most of the cases).
* Otherwise the data (1st version) will not be removed after the TTL. If no TTL, we want
* Math.max(maxVersions, minVersions, 1)
*/
private static int getMinVersions(ScanInfo oldScanInfo, final Store store) {
return oldScanInfo.getTtl() != Long.MAX_VALUE ? store.getFamily().getMinVersions()
: Math.max(Math.max(store.getFamily().getMinVersions(),
store.getFamily().getMaxVersions()),1);
}

public static ScanInfo getScanInfoForFlushesAndCompactions(Configuration conf,
ScanInfo oldScanInfo,
final Store store,
ScanType type) {
long ttl = getTimeToLiveForCompactions(store.getFamily(), oldScanInfo);
KeepDeletedCells keepDeletedCells = getKeepDeletedCells(store, type);
int minVersions = getMinVersions(oldScanInfo, store);
return new ScanInfo(conf,store.getFamily().getName(), minVersions,
Integer.MAX_VALUE, ttl, keepDeletedCells,
oldScanInfo.getTimeToPurgeDeletes(),
oldScanInfo.getComparator());
}

private static long getMaxLookback(Configuration conf){
//config param is in seconds, switch to millis
return conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
DEFAULT_PHOENIX_MAX_LOOKBACK_AGE) * 1000;
}

public static boolean isMaxLookbackTimeEnabled(Configuration conf){
return isMaxLookbackTimeEnabled(conf.getLong(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
DEFAULT_PHOENIX_MAX_LOOKBACK_AGE));
}

private static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
return maxLookbackTime > 0L;
}
}

0 comments on commit a37980c

Please sign in to comment.