Skip to content

Commit

Permalink
PHOENIX-5645 - GlobalIndexChecker should prevent compaction from purg…
Browse files Browse the repository at this point in the history
…ing very recently deleted cells
  • Loading branch information
gjacoby126 committed Jan 10, 2020
1 parent a69d182 commit 0773996
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 7 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.QueryServices;
Expand All @@ -48,6 +49,7 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.phoenix.util.TestUtil.assertRawCellCount;
import static org.apache.phoenix.util.TestUtil.assertRawRowCount;
import static org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN;
import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
Expand Down Expand Up @@ -243,6 +245,7 @@ public void testRecentMaxVersionsNotCompactedAway() throws Exception {
long[] allSCNs = {afterInsertSCN, afterFirstUpdateSCN, afterSecondUpdateSCN};
assertMultiVersionLookbacks(dataTableSelectSql, allValues, allSCNs);
assertMultiVersionLookbacks(indexTableSelectSql, allValues, allSCNs);

flush(dataTable);
flush(indexTable);
//after flush, check to make sure we can see all three versions at the appropriate times
Expand All @@ -258,9 +261,15 @@ public void testRecentMaxVersionsNotCompactedAway() throws Exception {
long afterLookbackAgeSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
majorCompact(dataTable, afterLookbackAgeSCN);
majorCompact(indexTable, afterLookbackAgeSCN);
long afterSecondCompactionSCN = org.apache.phoenix.util.EnvironmentEdgeManager.currentTimeMillis();
//we expect 3 rows -- 2 for row 1 and 1 for row 2, which we never touched.
assertRawRowCount(conn, dataTable, 3);
//empty column, 1 version of val 1, 3 versions of val2, 1 version of val3 = 6
assertRawCellCount(conn, dataTable, Bytes.toBytes("a"), 6);
//2 versions of empty column, 2 versions of val2,
// 2 versions of val3 (since we write whole rows to index) = 6
assertRawCellCount(conn, indexTable, Bytes.toBytes("ab\u0000a"), 6);
//empty column + 1 version each of val1,2 and 3 = 4
assertRawCellCount(conn, dataTable, Bytes.toBytes("b"), 4);
//1 version of empty column, 1 version of val2, 1 version of val3 = 3
assertRawCellCount(conn, indexTable, Bytes.toBytes("bc\u0000b"), 3);
}
}

Expand Down
Expand Up @@ -88,11 +88,13 @@ private static KeepDeletedCells getKeepDeletedCells(final Store store, ScanType

/*
* if the user set a TTL we should leave MIN_VERSIONS at the default (0 in most of the cases).
* Otherwise the data (1st version) will not be removed after the TTL.
* Otherwise the data (1st version) will not be removed after the TTL. If no TTL, we want
* Math.max(maxVersions, minVersions, 1)
*/
private static int getMinVersions(ScanInfo oldScanInfo, final Store store) {
return oldScanInfo.getTtl() != Long.MAX_VALUE ? store.getFamily().getMinVersions()
: Math.max(store.getFamily().getMinVersions(), 1);
: Math.max(Math.max(store.getFamily().getMinVersions(),
store.getFamily().getMaxVersions()),1);
}

public static ScanInfo getScanInfoForFlushesAndCompactions(Configuration conf,
Expand Down
Expand Up @@ -71,6 +71,7 @@
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
Expand Down Expand Up @@ -1546,9 +1547,15 @@ public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoproce
// This will lead to failure of cross cluster RPC if the effective user is not
// the login user. Switch to the login user context to ensure we have the expected
// security context.

final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
// since we will make a call to syscat, do nothing if we are compacting syscat itself
if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
// also, if max lookback age is already configured, we're already taken care of elsewhere
// unless the index stays disabled beyond the max lookback age, in which case you probably
// want to rebuild anyway
if (request.isMajor() &&
!ScanInfoUtil.isMaxLookbackTimeEnabled(compactionConfig) &&
!PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
@Override
public InternalScanner run() throws Exception {
Expand Down
64 changes: 63 additions & 1 deletion phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
Expand Up @@ -49,12 +49,15 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
Expand Down Expand Up @@ -871,6 +874,13 @@ public static void createTransactionalTable(Connection conn, String tableName, S
conn.createStatement().execute("create table " + tableName + TestUtil.TEST_TABLE_SCHEMA + "TRANSACTIONAL=true" + (extraProps.length() == 0 ? "" : ("," + extraProps)));
}

public static void dumpTable(Connection conn, TableName tableName)
throws SQLException, IOException{
ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
HTableInterface table = cqs.getTable(tableName.getName());
dumpTable(table);
}

public static void dumpTable(HTableInterface table) throws IOException {
System.out.println("************ dumping " + table + " **************");
Scan s = new Scan();
Expand All @@ -883,7 +893,9 @@ public static void dumpTable(HTableInterface table) throws IOException {
Cell current = null;
while (cellScanner.advance()) {
current = cellScanner.current();
System.out.println(current);
System.out.println(current + "column= " +
Bytes.toString(CellUtil.cloneQualifier(current)) +
" val=" + Bytes.toString(CellUtil.cloneValue(current)));
}
}
}
Expand Down Expand Up @@ -913,6 +925,46 @@ public static int getRowCount(Table table, boolean isRaw) throws IOException {
return rows;
}

public static CellCount getCellCount(Table table, boolean isRaw) throws IOException {
Scan s = new Scan();
s.setRaw(isRaw);;
s.setMaxVersions();

CellCount cellCount = new CellCount();
try (ResultScanner scanner = table.getScanner(s)) {
Result result = null;
while ((result = scanner.next()) != null) {
CellScanner cellScanner = result.cellScanner();
Cell current = null;
while (cellScanner.advance()) {
current = cellScanner.current();
cellCount.addCell(Bytes.toString(CellUtil.cloneRow(current)));
}
}
}
return cellCount;
}

static class CellCount {
private Map<String, Integer> rowCountMap = new HashMap<String, Integer>();

void addCell(String key){
if (rowCountMap.containsKey(key)){
rowCountMap.put(key, rowCountMap.get(key) +1);
} else {
rowCountMap.put(key, 1);
}
}

int getCellCount(String key){
if (rowCountMap.containsKey(key)){
return rowCountMap.get(key);
} else {
return 0;
}
}
}

public static void dumpIndexStatus(Connection conn, String indexName) throws IOException, SQLException {
try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
System.out.println("************ dumping index status for " + indexName + " **************");
Expand Down Expand Up @@ -1254,6 +1306,16 @@ public static void assertRawRowCount(Connection conn, TableName table, int expec
assertEquals(expectedRowCount, count);
}

public static void assertRawCellCount(Connection conn, TableName tableName,
byte[] row, int expectedCellCount)
throws SQLException, IOException{
ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
Table table = cqs.getTable(tableName.getName());
CellCount cellCount = getCellCount(table, true);
int count = cellCount.getCellCount(Bytes.toString(row));
assertEquals(expectedCellCount, count);
}

public static void assertRowExistsAtSCN(String url, String sql, long scn, boolean shouldExist)
throws SQLException {
boolean rowExists = false;
Expand Down

0 comments on commit 0773996

Please sign in to comment.