Skip to content

Commit

Permalink
PHOENIX-7313 All cell versions should not be retained during flushes … (
Browse files Browse the repository at this point in the history
  • Loading branch information
kadirozde committed May 16, 2024
1 parent 9a7d79a commit 0abdcdb
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,20 +357,33 @@ RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironme
dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
}

public void setScanOptionsForFlushesAndCompactions(ScanOptions options) {
public void setScanOptionsForFlushesAndCompactions(Store store, ScanOptions options,
boolean retainAllVersions) {
// We want the store to give us all the deleted cells to StoreCompactionScanner
options.setKeepDeletedCells(KeepDeletedCells.TTL);
options.setTTL(HConstants.FOREVER);
options.setMaxVersions(Integer.MAX_VALUE);
options.setMinVersions(Integer.MAX_VALUE);
if (retainAllVersions) {
options.setMaxVersions(Integer.MAX_VALUE);
options.setMinVersions(Integer.MAX_VALUE);
} else {
options.setMinVersions(Math.max(Math.max(options.getMaxVersions(),
store.getColumnFamilyDescriptor().getMaxVersions()), 1));
options.setMinVersions(Math.max(Math.max(options.getMinVersions(),
store.getColumnFamilyDescriptor().getMaxVersions()), 1));
}

}

@Override
public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
setScanOptionsForFlushesAndCompactions(options);
boolean retainAllVersions = isMaxLookbackTimeEnabled(
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf))
|| request.isMajor();
setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions);
return;
}
long maxLookbackAge = getMaxLookbackAge(c);
Expand All @@ -384,10 +397,14 @@ public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();

if (isPhoenixTableTTLEnabled(conf)) {
setScanOptionsForFlushesAndCompactions(options);
boolean retainAllVersions = isMaxLookbackTimeEnabled(
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions);
return;
}

long maxLookbackAge = getMaxLookbackAge(c);
if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store,
Expand All @@ -401,7 +418,9 @@ public void preMemStoreCompactionCompactScannerOpen(
throws IOException {
Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
setScanOptionsForFlushesAndCompactions(options);
boolean retainAllVersions = isMaxLookbackTimeEnabled(
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions);
return;
}
long maxLookbackAge = getMaxLookbackAge(c);
Expand All @@ -428,7 +447,7 @@ public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,

Configuration conf = c.getEnvironment().getConfiguration();
if (isPhoenixTableTTLEnabled(conf)) {
setScanOptionsForFlushesAndCompactions(options);
setScanOptionsForFlushesAndCompactions(store, options, true);
return;
}
if (!storeFileScanDoesntNeedAlteration(options)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class CompactionScanner implements InternalScanner {
private final byte[] emptyCF;
private final byte[] emptyCQ;
private final byte[] storeColumnFamily;
private final String tableName;
private final String columnFamilyName;
private static Map<String, Long> maxLookbackMap = new ConcurrentHashMap<>();
private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
private HBaseLevelRowCompactor hBaseLevelRowCompactor;
Expand All @@ -94,19 +96,18 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
this.emptyCQ = emptyCQ;
this.config = env.getConfiguration();
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
this.maxLookbackInMillis = maxLookbackInMillis;
String columnFamilyName = store.getColumnFamilyName();
columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
String tableName = region.getRegionInfo().getTable().getNameAsString();
tableName = region.getRegionInfo().getTable().getNameAsString();
Long overriddenMaxLookback =
maxLookbackMap.remove(tableName + SEPARATOR + columnFamilyName);
maxLookbackInMillis = overriddenMaxLookback == null ?
this.maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackInMillis : Math.max(maxLookbackInMillis, overriddenMaxLookback);
// The oldest scn is current time - maxLookbackInMillis. Phoenix sets the scan time range
// for scn queries [0, scn). This means that the maxlookback size should be
// maxLookbackInMillis + 1 so that the oldest scn does not return empty row
this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
compactionTime : compactionTime - (maxLookbackInMillis + 1);
this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
compactionTime : compactionTime - (this.maxLookbackInMillis + 1);
ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
ttl = cfd.getTimeToLive();
this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - ttl * 1000;
Expand All @@ -121,6 +122,9 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
|| localIndex;
phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
LOGGER.info("Starting Phoenix CompactionScanner for table " + tableName + " store "
+ columnFamilyName + " ttl " + ttl + "ms " + "max lookback "
+ maxLookbackInMillis + "ms");
}

/**
Expand Down Expand Up @@ -155,6 +159,8 @@ public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOE

@Override
public void close() throws IOException {
LOGGER.info("Closing Phoenix CompactionScanner for table " + tableName + " store "
+ columnFamilyName);
storeScanner.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan scan,
long currentTime = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP ?
EnvironmentEdgeManager.currentTimeMillis() : scan.getTimeRange().getMax();
ttl = env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 1000;
ttl *= 1000;
// Regardless if the Phoenix Table TTL feature is disabled cluster wide or the client is
// an older client and does not supply the empty column parameters, the masking should not
// be done here.
isMaskingEnabled = emptyCF != null && emptyCQ != null &&
env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
// be done here. We also disable masking when TTL is HConstants.FOREVER.
isMaskingEnabled = emptyCF != null && emptyCQ != null && ttl != HConstants.FOREVER
&& env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 1000;
ttl *= 1000;
}

private void init() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,14 @@ public InternalScanner run() throws Exception {
InternalScanner internalScanner = scanner;
if (request.isMajor()) {
boolean isDisabled = false;
final String fullTableName = tableName.getNameAsString();
boolean isMultiTenantIndexTable = false;
if (tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
isMultiTenantIndexTable = true;
}
final String fullTableName = isMultiTenantIndexTable ?
SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(),
MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) :
tableName.getNameAsString();
PTable table = null;
try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(
compactionConfig).unwrap(PhoenixConnection.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.CompactionScanner;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -45,7 +50,9 @@

import java.io.IOException;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -60,6 +67,9 @@
import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
Expand Down Expand Up @@ -316,6 +326,65 @@ public void testRecentlyDeletedRowsNotCompactedAway() throws Exception {
}
}

@Test(timeout=60000L)
public void testViewIndexIsCompacted() throws Exception {
if(hasTableLevelMaxLookback) {
return;
}
String baseTable = SchemaUtil.getTableName("SCHEMA1", generateUniqueName());
String globalViewName = generateUniqueName();
String fullGlobalViewName = SchemaUtil.getTableName("SCHEMA2", globalViewName);
String globalViewIdx = generateUniqueName();
TableName dataTable = TableName.valueOf(baseTable);
TableName indexTable = TableName.valueOf("_IDX_" + baseTable);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute("CREATE TABLE " + baseTable
+ " (TENANT_ID CHAR(15) NOT NULL, PK2 INTEGER NOT NULL, PK3 INTEGER NOT NULL, "
+ "COL1 VARCHAR, COL2 VARCHAR, COL3 CHAR(15) CONSTRAINT PK PRIMARY KEY"
+ "(TENANT_ID, PK2, PK3)) MULTI_TENANT=true");
conn.createStatement().execute("CREATE VIEW " + fullGlobalViewName
+ " AS SELECT * FROM " + baseTable);
conn.createStatement().execute("CREATE INDEX " + globalViewIdx + " ON "
+ fullGlobalViewName + " (COL1) INCLUDE (COL2)");

conn.createStatement().executeUpdate("UPSERT INTO " + fullGlobalViewName
+ " (TENANT_ID, PK2, PK3, COL1, COL2) VALUES ('TenantId1',1, 2, 'a', 'b')");
conn.commit();

String query = "SELECT COL2 FROM " + fullGlobalViewName + " WHERE COL1 = 'a'";
// Verify that query uses the global view index
ResultSet rs = conn.createStatement().executeQuery(query);
PTable table = ((PhoenixResultSet)rs).getContext().getCurrentTable().getTable();
assertTrue(table.getSchemaName().getString().equals("SCHEMA2") &&
table.getTableName().getString().equals(globalViewIdx));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertFalse(rs.next());
// Force a flush
flush(dataTable);
flush(indexTable);
assertRawRowCount(conn, dataTable, 1);
assertRawRowCount(conn, indexTable, 1);
// Delete the row from both tables
conn.createStatement().execute("DELETE FROM " + fullGlobalViewName
+ " WHERE TENANT_ID = 'TenantId1'");
conn.commit();
// Force a flush
flush(dataTable);
flush(indexTable);
assertRawRowCount(conn, dataTable, 1);
assertRawRowCount(conn, indexTable, 1);
// Move change beyond the max lookback window
injectEdge.setValue(System.currentTimeMillis() + MAX_LOOKBACK_AGE * 1000 + 1);
EnvironmentEdgeManager.injectEdge(injectEdge);
// Major compact both tables
majorCompact(dataTable);
majorCompact(indexTable);
// Everything should have been purged by major compaction
assertRawRowCount(conn, dataTable, 0);
assertRawRowCount(conn, indexTable, 0);
}
}
@Test(timeout=60000L)
public void testTTLAndMaxLookbackAge() throws Exception {
if(hasTableLevelMaxLookback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -131,8 +132,8 @@ public static synchronized Collection<Object[]> data() {
{ true, false, KeepDeletedCells.FALSE, 5, 50, null},
{ true, false, KeepDeletedCells.TRUE, 1, 25, null},
{ true, false, KeepDeletedCells.TTL, 5, 100, null},
{ false, false, KeepDeletedCells.FALSE, 1, 100, 15},
{ false, false, KeepDeletedCells.TRUE, 5, 50, 15},
{ false, false, KeepDeletedCells.FALSE, 1, 100, 0},
{ false, false, KeepDeletedCells.TRUE, 5, 50, 0},
{ false, false, KeepDeletedCells.TTL, 1, 25, 15}});
}

Expand All @@ -155,7 +156,7 @@ public static synchronized Collection<Object[]> data() {
@Test
public void testMaskingAndCompaction() throws Exception {
final int maxLookbackAge = tableLevelMaxLooback != null ? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
final int maxDeleteCounter = maxLookbackAge;
final int maxDeleteCounter = maxLookbackAge == 0 ? 1 : maxLookbackAge;
final int maxCompactionCounter = ttl / 2;
final int maxMaskingCounter = 2 * ttl;
final byte[] rowKey = Bytes.toBytes("a");
Expand Down Expand Up @@ -232,10 +233,51 @@ public void testMaskingAndCompaction() throws Exception {
}

@Test
public void testRowSpansMultipleTTLWindows() throws Exception {
if (tableLevelMaxLooback != null) {
public void testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
throws Exception {
final int maxLookbackAge = tableLevelMaxLooback != null
? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
if (maxLookbackAge > 0) {
return;
}
try (Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
createTable(tableName);
conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = 0");
conn.commit();
final int flushCount = 10;
byte[] row = Bytes.toBytes("a");
for (int i = 0; i < flushCount; i++) {
// Generate more row versions than the maximum cell versions for the table
int updateCount = RAND.nextInt(10) + versions;
for (int j = 0; j < updateCount; j++) {
updateRow(conn, tableName, "a");
}
flush(TableName.valueOf(tableName));
// At every flush, extra cell versions should be removed.
// MAX_COLUMN_INDEX table columns and one empty column will be retained for
// each row version.
TestUtil.assertRawCellCount(conn, TableName.valueOf(tableName), row,
(i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
}
// Run one minor compaction (in case no minor compaction has happened yet)
Admin admin = utility.getAdmin();
admin.compact(TableName.valueOf(tableName));
int waitCount = 0;
while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
Bytes.toBytes("a")) < flushCount * (MAX_COLUMN_INDEX + 1) * versions) {
// Wait for major compactions to happen
Thread.sleep(1000);
waitCount++;
if (waitCount > 30) {
Assert.fail();
}
}
}
}

@Test
public void testRowSpansMultipleTTLWindows() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
createTable(tableName);
Expand Down Expand Up @@ -303,6 +345,16 @@ private void updateRow(Connection conn, String tableName1, String tableName2, St
conn.commit();
}

private void updateRow(Connection conn, String tableName, String id)
throws SQLException {

for (int i = 1; i <= MAX_COLUMN_INDEX; i++) {
String value = Integer.toString(RAND.nextInt(1000));
updateColumn(conn, tableName, id, i, value);
}
conn.commit();
}

private void compareRow(Connection conn, String tableName1, String tableName2, String id,
int maxColumnIndex) throws SQLException, IOException {
StringBuilder queryBuilder = new StringBuilder("SELECT ");
Expand Down
Loading

0 comments on commit 0abdcdb

Please sign in to comment.