diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index fa450625e34..d27a187dd10 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -357,20 +357,33 @@ RegionScanner getWrappedScanner(final ObserverContext c, Store store, ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, CompactionRequest request) throws IOException { Configuration conf = c.getEnvironment().getConfiguration(); if (isPhoenixTableTTLEnabled(conf)) { - setScanOptionsForFlushesAndCompactions(options); + boolean retainAllVersions = isMaxLookbackTimeEnabled( + BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf)) + || request.isMajor(); + setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions); return; } long maxLookbackAge = getMaxLookbackAge(c); @@ -384,10 +397,14 @@ public void preCompactScannerOpen(ObserverContext public void preFlushScannerOpen(ObserverContext c, Store store, ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { Configuration conf = c.getEnvironment().getConfiguration(); + if (isPhoenixTableTTLEnabled(conf)) { - setScanOptionsForFlushesAndCompactions(options); + boolean retainAllVersions = isMaxLookbackTimeEnabled( + BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf)); + setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions); return; } + long maxLookbackAge = getMaxLookbackAge(c); if (isMaxLookbackTimeEnabled(maxLookbackAge)) { setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store, @@ -401,7 +418,9 @@ public void preMemStoreCompactionCompactScannerOpen( throws IOException { Configuration conf = c.getEnvironment().getConfiguration(); if (isPhoenixTableTTLEnabled(conf)) { - setScanOptionsForFlushesAndCompactions(options); + boolean retainAllVersions = isMaxLookbackTimeEnabled( + BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf)); + setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions); return; } long maxLookbackAge = getMaxLookbackAge(c); @@ -428,7 +447,7 @@ public void preStoreScannerOpen(ObserverContext c, Configuration conf = c.getEnvironment().getConfiguration(); if (isPhoenixTableTTLEnabled(conf)) { - setScanOptionsForFlushesAndCompactions(options); + setScanOptionsForFlushesAndCompactions(store, options, true); return; } if (!storeFileScanDoesntNeedAlteration(options)) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index 3bcc2cefa81..ebe92b87414 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -76,6 +76,8 @@ public class CompactionScanner implements InternalScanner { private final byte[] emptyCF; private final byte[] emptyCQ; private final byte[] storeColumnFamily; + private final String tableName; + private final String columnFamilyName; private static Map maxLookbackMap = new ConcurrentHashMap<>(); private PhoenixLevelRowCompactor phoenixLevelRowCompactor; private HBaseLevelRowCompactor hBaseLevelRowCompactor; @@ -94,19 +96,18 @@ public CompactionScanner(RegionCoprocessorEnvironment env, this.emptyCQ = emptyCQ; this.config = env.getConfiguration(); compactionTime = EnvironmentEdgeManager.currentTimeMillis(); - this.maxLookbackInMillis = maxLookbackInMillis; - String columnFamilyName = store.getColumnFamilyName(); + columnFamilyName = store.getColumnFamilyName(); storeColumnFamily = columnFamilyName.getBytes(); - String tableName = region.getRegionInfo().getTable().getNameAsString(); + tableName = region.getRegionInfo().getTable().getNameAsString(); Long overriddenMaxLookback = maxLookbackMap.remove(tableName + SEPARATOR + columnFamilyName); - maxLookbackInMillis = overriddenMaxLookback == null ? + this.maxLookbackInMillis = overriddenMaxLookback == null ? maxLookbackInMillis : Math.max(maxLookbackInMillis, overriddenMaxLookback); // The oldest scn is current time - maxLookbackInMillis. Phoenix sets the scan time range // for scn queries [0, scn). This means that the maxlookback size should be // maxLookbackInMillis + 1 so that the oldest scn does not return empty row - this.maxLookbackWindowStart = maxLookbackInMillis == 0 ? - compactionTime : compactionTime - (maxLookbackInMillis + 1); + this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ? + compactionTime : compactionTime - (this.maxLookbackInMillis + 1); ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); ttl = cfd.getTimeToLive(); this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - ttl * 1000; @@ -121,6 +122,9 @@ public CompactionScanner(RegionCoprocessorEnvironment env, || localIndex; phoenixLevelRowCompactor = new PhoenixLevelRowCompactor(); hBaseLevelRowCompactor = new HBaseLevelRowCompactor(); + LOGGER.info("Starting Phoenix CompactionScanner for table " + tableName + " store " + + columnFamilyName + " ttl " + ttl + "ms " + "max lookback " + + maxLookbackInMillis + "ms"); } /** @@ -155,6 +159,8 @@ public boolean next(List result, ScannerContext scannerContext) throws IOE @Override public void close() throws IOException { + LOGGER.info("Closing Phoenix CompactionScanner for table " + tableName + " store " + + columnFamilyName); storeScanner.close(); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java index 261ef94fe80..aa1196130fb 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java @@ -70,14 +70,14 @@ public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan scan, long currentTime = scan.getTimeRange().getMax() == HConstants.LATEST_TIMESTAMP ? EnvironmentEdgeManager.currentTimeMillis() : scan.getTimeRange().getMax(); ttl = env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive(); - ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 1000; - ttl *= 1000; // Regardless if the Phoenix Table TTL feature is disabled cluster wide or the client is // an older client and does not supply the empty column parameters, the masking should not - // be done here. - isMaskingEnabled = emptyCF != null && emptyCQ != null && - env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED, + // be done here. We also disable masking when TTL is HConstants.FOREVER. + isMaskingEnabled = emptyCF != null && emptyCQ != null && ttl != HConstants.FOREVER + && env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED, QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED); + ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 1000; + ttl *= 1000; } private void init() throws IOException { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index c07e1e25c86..4ae157aba19 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -597,7 +597,14 @@ public InternalScanner run() throws Exception { InternalScanner internalScanner = scanner; if (request.isMajor()) { boolean isDisabled = false; - final String fullTableName = tableName.getNameAsString(); + boolean isMultiTenantIndexTable = false; + if (tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) { + isMultiTenantIndexTable = true; + } + final String fullTableName = isMultiTenantIndexTable ? + SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(), + MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) : + tableName.getNameAsString(); PTable table = null; try (PhoenixConnection conn = QueryUtil.getConnectionOnServer( compactionConfig).unwrap(PhoenixConnection.class)) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java index f95918159ad..f9900fdb7bd 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java @@ -22,17 +22,22 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.CompactionScanner; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.ManualEnvironmentEdge; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.After; import org.junit.Assert; @@ -45,7 +50,9 @@ import java.io.IOException; import java.sql.Connection; +import java.sql.Date; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -60,6 +67,9 @@ import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN; import static org.apache.phoenix.util.TestUtil.assertTableHasTtl; import static org.apache.phoenix.util.TestUtil.assertTableHasVersions; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @Category(NeedsOwnMiniClusterTest.class) @RunWith(Parameterized.class) @@ -316,6 +326,65 @@ public void testRecentlyDeletedRowsNotCompactedAway() throws Exception { } } + @Test(timeout=60000L) + public void testViewIndexIsCompacted() throws Exception { + if(hasTableLevelMaxLookback) { + return; + } + String baseTable = SchemaUtil.getTableName("SCHEMA1", generateUniqueName()); + String globalViewName = generateUniqueName(); + String fullGlobalViewName = SchemaUtil.getTableName("SCHEMA2", globalViewName); + String globalViewIdx = generateUniqueName(); + TableName dataTable = TableName.valueOf(baseTable); + TableName indexTable = TableName.valueOf("_IDX_" + baseTable); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("CREATE TABLE " + baseTable + + " (TENANT_ID CHAR(15) NOT NULL, PK2 INTEGER NOT NULL, PK3 INTEGER NOT NULL, " + + "COL1 VARCHAR, COL2 VARCHAR, COL3 CHAR(15) CONSTRAINT PK PRIMARY KEY" + + "(TENANT_ID, PK2, PK3)) MULTI_TENANT=true"); + conn.createStatement().execute("CREATE VIEW " + fullGlobalViewName + + " AS SELECT * FROM " + baseTable); + conn.createStatement().execute("CREATE INDEX " + globalViewIdx + " ON " + + fullGlobalViewName + " (COL1) INCLUDE (COL2)"); + + conn.createStatement().executeUpdate("UPSERT INTO " + fullGlobalViewName + + " (TENANT_ID, PK2, PK3, COL1, COL2) VALUES ('TenantId1',1, 2, 'a', 'b')"); + conn.commit(); + + String query = "SELECT COL2 FROM " + fullGlobalViewName + " WHERE COL1 = 'a'"; + // Verify that query uses the global view index + ResultSet rs = conn.createStatement().executeQuery(query); + PTable table = ((PhoenixResultSet)rs).getContext().getCurrentTable().getTable(); + assertTrue(table.getSchemaName().getString().equals("SCHEMA2") && + table.getTableName().getString().equals(globalViewIdx)); + assertTrue(rs.next()); + assertEquals("b", rs.getString(1)); + assertFalse(rs.next()); + // Force a flush + flush(dataTable); + flush(indexTable); + assertRawRowCount(conn, dataTable, 1); + assertRawRowCount(conn, indexTable, 1); + // Delete the row from both tables + conn.createStatement().execute("DELETE FROM " + fullGlobalViewName + + " WHERE TENANT_ID = 'TenantId1'"); + conn.commit(); + // Force a flush + flush(dataTable); + flush(indexTable); + assertRawRowCount(conn, dataTable, 1); + assertRawRowCount(conn, indexTable, 1); + // Move change beyond the max lookback window + injectEdge.setValue(System.currentTimeMillis() + MAX_LOOKBACK_AGE * 1000 + 1); + EnvironmentEdgeManager.injectEdge(injectEdge); + // Major compact both tables + majorCompact(dataTable); + majorCompact(indexTable); + // Everything should have been purged by major compaction + assertRawRowCount(conn, dataTable, 0); + assertRawRowCount(conn, indexTable, 0); + } + } @Test(timeout=60000L) public void testTTLAndMaxLookbackAge() throws Exception { if(hasTableLevelMaxLookback) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java index 852cf8f01f8..3b6d1277e8c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.util.EnvironmentEdgeManager; @@ -131,8 +132,8 @@ public static synchronized Collection data() { { true, false, KeepDeletedCells.FALSE, 5, 50, null}, { true, false, KeepDeletedCells.TRUE, 1, 25, null}, { true, false, KeepDeletedCells.TTL, 5, 100, null}, - { false, false, KeepDeletedCells.FALSE, 1, 100, 15}, - { false, false, KeepDeletedCells.TRUE, 5, 50, 15}, + { false, false, KeepDeletedCells.FALSE, 1, 100, 0}, + { false, false, KeepDeletedCells.TRUE, 5, 50, 0}, { false, false, KeepDeletedCells.TTL, 1, 25, 15}}); } @@ -155,7 +156,7 @@ public static synchronized Collection data() { @Test public void testMaskingAndCompaction() throws Exception { final int maxLookbackAge = tableLevelMaxLooback != null ? tableLevelMaxLooback : MAX_LOOKBACK_AGE; - final int maxDeleteCounter = maxLookbackAge; + final int maxDeleteCounter = maxLookbackAge == 0 ? 1 : maxLookbackAge; final int maxCompactionCounter = ttl / 2; final int maxMaskingCounter = 2 * ttl; final byte[] rowKey = Bytes.toBytes("a"); @@ -232,10 +233,51 @@ public void testMaskingAndCompaction() throws Exception { } @Test - public void testRowSpansMultipleTTLWindows() throws Exception { - if (tableLevelMaxLooback != null) { + public void testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled() + throws Exception { + final int maxLookbackAge = tableLevelMaxLooback != null + ? tableLevelMaxLooback : MAX_LOOKBACK_AGE; + if (maxLookbackAge > 0) { return; } + try (Connection conn = DriverManager.getConnection(getUrl())) { + String tableName = generateUniqueName(); + createTable(tableName); + conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = 0"); + conn.commit(); + final int flushCount = 10; + byte[] row = Bytes.toBytes("a"); + for (int i = 0; i < flushCount; i++) { + // Generate more row versions than the maximum cell versions for the table + int updateCount = RAND.nextInt(10) + versions; + for (int j = 0; j < updateCount; j++) { + updateRow(conn, tableName, "a"); + } + flush(TableName.valueOf(tableName)); + // At every flush, extra cell versions should be removed. + // MAX_COLUMN_INDEX table columns and one empty column will be retained for + // each row version. + TestUtil.assertRawCellCount(conn, TableName.valueOf(tableName), row, + (i + 1) * (MAX_COLUMN_INDEX + 1) * versions); + } + // Run one minor compaction (in case no minor compaction has happened yet) + Admin admin = utility.getAdmin(); + admin.compact(TableName.valueOf(tableName)); + int waitCount = 0; + while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), + Bytes.toBytes("a")) < flushCount * (MAX_COLUMN_INDEX + 1) * versions) { + // Wait for major compactions to happen + Thread.sleep(1000); + waitCount++; + if (waitCount > 30) { + Assert.fail(); + } + } + } + } + + @Test + public void testRowSpansMultipleTTLWindows() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { String tableName = generateUniqueName(); createTable(tableName); @@ -303,6 +345,16 @@ private void updateRow(Connection conn, String tableName1, String tableName2, St conn.commit(); } + private void updateRow(Connection conn, String tableName, String id) + throws SQLException { + + for (int i = 1; i <= MAX_COLUMN_INDEX; i++) { + String value = Integer.toString(RAND.nextInt(1000)); + updateColumn(conn, tableName, id, i, value); + } + conn.commit(); + } + private void compareRow(Connection conn, String tableName1, String tableName2, String id, int maxColumnIndex) throws SQLException, IOException { StringBuilder queryBuilder = new StringBuilder("SELECT "); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 79b0168f776..c15bd407c91 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -1364,13 +1364,17 @@ public static void assertRawRowCount(Connection conn, TableName table, int expec assertEquals(expectedRowCount, count); } - public static void assertRawCellCount(Connection conn, TableName tableName, - byte[] row, int expectedCellCount) - throws SQLException, IOException { + public static int getRawCellCount(Connection conn, TableName tableName, byte[] row) + throws SQLException, IOException { ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices(); Table table = cqs.getTable(tableName.getName()); CellCount cellCount = getCellCount(table, true); - int count = cellCount.getCellCount(Bytes.toString(row)); + return cellCount.getCellCount(Bytes.toString(row)); + } + public static void assertRawCellCount(Connection conn, TableName tableName, + byte[] row, int expectedCellCount) + throws SQLException, IOException { + int count = getRawCellCount(conn, tableName, row); assertEquals(expectedCellCount, count); }